From a2958742f31bd52ff54578fda09ba03d5bad58ea Mon Sep 17 00:00:00 2001 From: hanish520 Date: Mon, 12 Jan 2026 01:03:56 -0800 Subject: [PATCH 1/5] initial commit for new client --- client/client.go | 77 ++++++-- go.mod | 1 + internal/proto/clientpb/client.pb.go | 176 +++++++++++++++--- internal/proto/clientpb/client.proto | 19 +- internal/proto/clientpb/client_gorums.pb.go | 95 +++++----- internal/proto/hotstuffpb/hotstuff.pb.go | 12 +- .../proto/hotstuffpb/hotstuff_gorums.pb.go | 10 +- server/clientio.go | 163 +++++++++++++--- types.go | 10 + 9 files changed, 434 insertions(+), 129 deletions(-) diff --git a/client/client.go b/client/client.go index 7af896a90..7ff24768a 100644 --- a/client/client.go +++ b/client/client.go @@ -9,6 +9,8 @@ import ( "errors" "io" "math" + "math/rand" + "strings" "sync" "time" @@ -41,7 +43,6 @@ func (q *qspec) ExecCommandQF(_ *clientpb.Command, signatures map[uint32]*emptyp type pendingCmd struct { sequenceNumber uint64 sendTime time.Time - promise *clientpb.AsyncEmpty cancelCtx context.CancelFunc } @@ -182,6 +183,8 @@ func (c *Client) Stop() { } func (c *Client) close() { + // Signal the command handler to stop fetching statuses before closing the manager. + close(c.pendingCmds) c.mgr.Close() err := c.reader.Close() if err != nil { @@ -238,8 +241,8 @@ loop: } ctx, cancel := context.WithTimeout(ctx, c.timeout) - promise := c.gorumsConfig.ExecCommand(ctx, cmd) - pending := pendingCmd{sequenceNumber: num, sendTime: time.Now(), promise: promise, cancelCtx: cancel} + c.gorumsConfig.ExecCommand(ctx, cmd) + pending := pendingCmd{sequenceNumber: num, sendTime: time.Now(), cancelCtx: cancel} num++ select { @@ -257,6 +260,52 @@ loop: return nil } +func (c *Client) fetchCommandStatus(sequenceNumber uint64) hotstuff.CommandStatus { + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + timeout := time.After(c.timeout) + + for { + select { + case <-ticker.C: + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + cmd := &clientpb.Command{ + ClientID: uint32(c.id), + SequenceNumber: sequenceNumber, + } + nodes := c.gorumsConfig.Nodes() + if len(nodes) == 0 { + c.logger.Error("No nodes available in gorums config") + cancel() + continue + } + node := nodes[rand.Intn(len(nodes))] + response, err := node.CommandStatus(ctx, cmd) + cancel() + + if err != nil { + c.logger.Errorf("Failed to fetch command status (client: %d, sequence: %d): %v", c.id, sequenceNumber, err) + // If the node/manager was closed, stop trying and return UNKNOWN. + if strings.Contains(err.Error(), "node closed") { + return hotstuff.UNKNOWN + } + continue + } + if response == nil || response.Command == nil { + c.logger.Errorf("Invalid response received when fetching command status (client: %d, sequence: %d)", c.id, sequenceNumber) + continue + } + c.logger.Infof("Fetched command status (client: %d, sequence: %d, status: %d)", c.id, sequenceNumber, response.Status) + status := hotstuff.CommandStatus(response.Status) + if status == hotstuff.COMMITTED || status == hotstuff.EXECUTED || status == hotstuff.FAILED { + return status + } + case <-timeout: + return hotstuff.UNKNOWN + } + } +} + // handleCommands will get pending commands from the pendingCmds channel and then // handle them as they become acknowledged by the replicas. We expect the commands to be // acknowledged in the order that they were sent. @@ -274,16 +323,17 @@ func (c *Client) handleCommands(ctx context.Context) (executed, failed, timeout case <-ctx.Done(): return } - _, err := cmd.promise.Get() - if err != nil { - if errors.Is(err, context.DeadlineExceeded) { - c.logger.Debug("Command timed out.") - timeout++ - } else if !errors.Is(err, context.Canceled) { - c.logger.Debugf("Did not get enough replies for command: %v\n", err) - failed++ - } - } else { + response := c.fetchCommandStatus(cmd.sequenceNumber) + + switch response { + case hotstuff.UNKNOWN: + c.logger.Infof("Command timed out (client: %d, sequence: %d)", c.id, cmd.sequenceNumber) + timeout++ + case hotstuff.FAILED: + c.logger.Infof("Command failed (client: %d, sequence: %d)", c.id, cmd.sequenceNumber) + failed++ + default: + c.logger.Infof("Command executed (client: %d, sequence: %d)", c.id, cmd.sequenceNumber) executed++ } c.mut.Lock() @@ -291,7 +341,6 @@ func (c *Client) handleCommands(ctx context.Context) (executed, failed, timeout c.highestCommitted = cmd.sequenceNumber } c.mut.Unlock() - duration := time.Since(cmd.sendTime) c.eventLoop.AddEvent(LatencyMeasurementEvent{Latency: duration}) } diff --git a/go.mod b/go.mod index 43264eae6..664d5d6b8 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.25.6 require ( cuelang.org/go v0.15.4 github.com/felixge/fgprof v0.9.5 + github.com/golang/protobuf v1.5.4 github.com/google/go-cmp v0.7.0 github.com/kilic/bls12-381 v0.1.1-0.20210208205449-6045b0235e36 github.com/mroth/weightedrand v1.0.0 diff --git a/internal/proto/clientpb/client.pb.go b/internal/proto/clientpb/client.pb.go index 6ed28beb8..ba8b7d1c4 100644 --- a/internal/proto/clientpb/client.pb.go +++ b/internal/proto/clientpb/client.pb.go @@ -7,10 +7,10 @@ package clientpb import ( + empty "github.com/golang/protobuf/ptypes/empty" _ "github.com/relab/gorums" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" - emptypb "google.golang.org/protobuf/types/known/emptypb" reflect "reflect" sync "sync" unsafe "unsafe" @@ -23,6 +23,113 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) +type CommandStatusResponse_Status int32 + +const ( + CommandStatusResponse_UNKNOWN CommandStatusResponse_Status = 0 + CommandStatusResponse_PENDING CommandStatusResponse_Status = 1 + CommandStatusResponse_COMMITTED CommandStatusResponse_Status = 2 + CommandStatusResponse_EXECUTED CommandStatusResponse_Status = 3 + CommandStatusResponse_FAILED CommandStatusResponse_Status = 4 +) + +// Enum value maps for CommandStatusResponse_Status. +var ( + CommandStatusResponse_Status_name = map[int32]string{ + 0: "UNKNOWN", + 1: "PENDING", + 2: "COMMITTED", + 3: "EXECUTED", + 4: "FAILED", + } + CommandStatusResponse_Status_value = map[string]int32{ + "UNKNOWN": 0, + "PENDING": 1, + "COMMITTED": 2, + "EXECUTED": 3, + "FAILED": 4, + } +) + +func (x CommandStatusResponse_Status) Enum() *CommandStatusResponse_Status { + p := new(CommandStatusResponse_Status) + *p = x + return p +} + +func (x CommandStatusResponse_Status) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (CommandStatusResponse_Status) Descriptor() protoreflect.EnumDescriptor { + return file_internal_proto_clientpb_client_proto_enumTypes[0].Descriptor() +} + +func (CommandStatusResponse_Status) Type() protoreflect.EnumType { + return &file_internal_proto_clientpb_client_proto_enumTypes[0] +} + +func (x CommandStatusResponse_Status) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use CommandStatusResponse_Status.Descriptor instead. +func (CommandStatusResponse_Status) EnumDescriptor() ([]byte, []int) { + return file_internal_proto_clientpb_client_proto_rawDescGZIP(), []int{0, 0} +} + +type CommandStatusResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Status CommandStatusResponse_Status `protobuf:"varint,1,opt,name=status,proto3,enum=clientpb.CommandStatusResponse_Status" json:"status,omitempty"` + Command *Command `protobuf:"bytes,2,opt,name=command,proto3" json:"command,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *CommandStatusResponse) Reset() { + *x = CommandStatusResponse{} + mi := &file_internal_proto_clientpb_client_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *CommandStatusResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CommandStatusResponse) ProtoMessage() {} + +func (x *CommandStatusResponse) ProtoReflect() protoreflect.Message { + mi := &file_internal_proto_clientpb_client_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CommandStatusResponse.ProtoReflect.Descriptor instead. +func (*CommandStatusResponse) Descriptor() ([]byte, []int) { + return file_internal_proto_clientpb_client_proto_rawDescGZIP(), []int{0} +} + +func (x *CommandStatusResponse) GetStatus() CommandStatusResponse_Status { + if x != nil { + return x.Status + } + return CommandStatusResponse_UNKNOWN +} + +func (x *CommandStatusResponse) GetCommand() *Command { + if x != nil { + return x.Command + } + return nil +} + // Command is the request that is sent to the HotStuff replicas with the data to // be executed. type Command struct { @@ -36,7 +143,7 @@ type Command struct { func (x *Command) Reset() { *x = Command{} - mi := &file_internal_proto_clientpb_client_proto_msgTypes[0] + mi := &file_internal_proto_clientpb_client_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -48,7 +155,7 @@ func (x *Command) String() string { func (*Command) ProtoMessage() {} func (x *Command) ProtoReflect() protoreflect.Message { - mi := &file_internal_proto_clientpb_client_proto_msgTypes[0] + mi := &file_internal_proto_clientpb_client_proto_msgTypes[1] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -61,7 +168,7 @@ func (x *Command) ProtoReflect() protoreflect.Message { // Deprecated: Use Command.ProtoReflect.Descriptor instead. func (*Command) Descriptor() ([]byte, []int) { - return file_internal_proto_clientpb_client_proto_rawDescGZIP(), []int{0} + return file_internal_proto_clientpb_client_proto_rawDescGZIP(), []int{1} } func (x *Command) GetClientID() uint32 { @@ -95,7 +202,7 @@ type Batch struct { func (x *Batch) Reset() { *x = Batch{} - mi := &file_internal_proto_clientpb_client_proto_msgTypes[1] + mi := &file_internal_proto_clientpb_client_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -107,7 +214,7 @@ func (x *Batch) String() string { func (*Batch) ProtoMessage() {} func (x *Batch) ProtoReflect() protoreflect.Message { - mi := &file_internal_proto_clientpb_client_proto_msgTypes[1] + mi := &file_internal_proto_clientpb_client_proto_msgTypes[2] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -120,7 +227,7 @@ func (x *Batch) ProtoReflect() protoreflect.Message { // Deprecated: Use Batch.ProtoReflect.Descriptor instead. func (*Batch) Descriptor() ([]byte, []int) { - return file_internal_proto_clientpb_client_proto_rawDescGZIP(), []int{1} + return file_internal_proto_clientpb_client_proto_rawDescGZIP(), []int{2} } func (x *Batch) GetCommands() []*Command { @@ -134,15 +241,26 @@ var File_internal_proto_clientpb_client_proto protoreflect.FileDescriptor const file_internal_proto_clientpb_client_proto_rawDesc = "" + "\n" + - "$internal/proto/clientpb/client.proto\x12\bclientpb\x1a\fgorums.proto\x1a\x1bgoogle/protobuf/empty.proto\"a\n" + + "$internal/proto/clientpb/client.proto\x12\bclientpb\x1a\fgorums.proto\x1a\x1bgoogle/protobuf/empty.proto\"\xd1\x01\n" + + "\x15CommandStatusResponse\x12>\n" + + "\x06status\x18\x01 \x01(\x0e2&.clientpb.CommandStatusResponse.StatusR\x06status\x12+\n" + + "\acommand\x18\x02 \x01(\v2\x11.clientpb.CommandR\acommand\"K\n" + + "\x06Status\x12\v\n" + + "\aUNKNOWN\x10\x00\x12\v\n" + + "\aPENDING\x10\x01\x12\r\n" + + "\tCOMMITTED\x10\x02\x12\f\n" + + "\bEXECUTED\x10\x03\x12\n" + + "\n" + + "\x06FAILED\x10\x04\"a\n" + "\aCommand\x12\x1a\n" + "\bClientID\x18\x01 \x01(\rR\bClientID\x12&\n" + "\x0eSequenceNumber\x18\x02 \x01(\x04R\x0eSequenceNumber\x12\x12\n" + "\x04Data\x18\x03 \x01(\fR\x04Data\"6\n" + "\x05Batch\x12-\n" + - "\bCommands\x18\x01 \x03(\v2\x11.clientpb.CommandR\bCommands2L\n" + - "\x06Client\x12B\n" + - "\vExecCommand\x12\x11.clientpb.Command\x1a\x16.google.protobuf.Empty\"\b\xa0\xb5\x18\x01ะต\x18\x01B3Z1github.com/relab/hotstuff/internal/proto/clientpbb\x06proto3" + "\bCommands\x18\x01 \x03(\v2\x11.clientpb.CommandR\bCommands2\x8f\x01\n" + + "\x06Client\x12>\n" + + "\vExecCommand\x12\x11.clientpb.Command\x1a\x16.google.protobuf.Empty\"\x04\x98\xb5\x18\x01\x12E\n" + + "\rCommandStatus\x12\x11.clientpb.Command\x1a\x1f.clientpb.CommandStatusResponse\"\x00B3Z1github.com/relab/hotstuff/internal/proto/clientpbb\x06proto3" var ( file_internal_proto_clientpb_client_proto_rawDescOnce sync.Once @@ -156,21 +274,28 @@ func file_internal_proto_clientpb_client_proto_rawDescGZIP() []byte { return file_internal_proto_clientpb_client_proto_rawDescData } -var file_internal_proto_clientpb_client_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_internal_proto_clientpb_client_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_internal_proto_clientpb_client_proto_msgTypes = make([]protoimpl.MessageInfo, 3) var file_internal_proto_clientpb_client_proto_goTypes = []any{ - (*Command)(nil), // 0: clientpb.Command - (*Batch)(nil), // 1: clientpb.Batch - (*emptypb.Empty)(nil), // 2: google.protobuf.Empty + (CommandStatusResponse_Status)(0), // 0: clientpb.CommandStatusResponse.Status + (*CommandStatusResponse)(nil), // 1: clientpb.CommandStatusResponse + (*Command)(nil), // 2: clientpb.Command + (*Batch)(nil), // 3: clientpb.Batch + (*empty.Empty)(nil), // 4: google.protobuf.Empty } var file_internal_proto_clientpb_client_proto_depIdxs = []int32{ - 0, // 0: clientpb.Batch.Commands:type_name -> clientpb.Command - 0, // 1: clientpb.Client.ExecCommand:input_type -> clientpb.Command - 2, // 2: clientpb.Client.ExecCommand:output_type -> google.protobuf.Empty - 2, // [2:3] is the sub-list for method output_type - 1, // [1:2] is the sub-list for method input_type - 1, // [1:1] is the sub-list for extension type_name - 1, // [1:1] is the sub-list for extension extendee - 0, // [0:1] is the sub-list for field type_name + 0, // 0: clientpb.CommandStatusResponse.status:type_name -> clientpb.CommandStatusResponse.Status + 2, // 1: clientpb.CommandStatusResponse.command:type_name -> clientpb.Command + 2, // 2: clientpb.Batch.Commands:type_name -> clientpb.Command + 2, // 3: clientpb.Client.ExecCommand:input_type -> clientpb.Command + 2, // 4: clientpb.Client.CommandStatus:input_type -> clientpb.Command + 4, // 5: clientpb.Client.ExecCommand:output_type -> google.protobuf.Empty + 1, // 6: clientpb.Client.CommandStatus:output_type -> clientpb.CommandStatusResponse + 5, // [5:7] is the sub-list for method output_type + 3, // [3:5] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name } func init() { file_internal_proto_clientpb_client_proto_init() } @@ -183,13 +308,14 @@ func file_internal_proto_clientpb_client_proto_init() { File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_internal_proto_clientpb_client_proto_rawDesc), len(file_internal_proto_clientpb_client_proto_rawDesc)), - NumEnums: 0, - NumMessages: 2, + NumEnums: 1, + NumMessages: 3, NumExtensions: 0, NumServices: 1, }, GoTypes: file_internal_proto_clientpb_client_proto_goTypes, DependencyIndexes: file_internal_proto_clientpb_client_proto_depIdxs, + EnumInfos: file_internal_proto_clientpb_client_proto_enumTypes, MessageInfos: file_internal_proto_clientpb_client_proto_msgTypes, }.Build() File_internal_proto_clientpb_client_proto = out.File diff --git a/internal/proto/clientpb/client.proto b/internal/proto/clientpb/client.proto index 05badc4b2..59ed1e90a 100644 --- a/internal/proto/clientpb/client.proto +++ b/internal/proto/clientpb/client.proto @@ -12,11 +12,26 @@ service Client { // ExecCommand sends a command to all replicas and waits for valid signatures // from f+1 replicas rpc ExecCommand(Command) returns (google.protobuf.Empty) { - option (gorums.quorumcall) = true; - option (gorums.async) = true; + option (gorums.multicast) = true; } + + rpc CommandStatus(Command) returns (CommandStatusResponse) { + } +} + +message CommandStatusResponse { + enum Status { + UNKNOWN = 0; + PENDING = 1; + COMMITTED = 2; + EXECUTED = 3; + FAILED = 4; + } + Status status = 1; + Command command = 2; } + // Command is the request that is sent to the HotStuff replicas with the data to // be executed. message Command { diff --git a/internal/proto/clientpb/client_gorums.pb.go b/internal/proto/clientpb/client_gorums.pb.go index cd75440d8..baaa64c51 100644 --- a/internal/proto/clientpb/client_gorums.pb.go +++ b/internal/proto/clientpb/client_gorums.pb.go @@ -9,10 +9,9 @@ package clientpb import ( context "context" fmt "fmt" + empty "github.com/golang/protobuf/ptypes/empty" gorums "github.com/relab/gorums" encoding "google.golang.org/grpc/encoding" - proto "google.golang.org/protobuf/proto" - emptypb "google.golang.org/protobuf/types/known/emptypb" ) const ( @@ -150,75 +149,69 @@ type Node struct { *gorums.RawNode } +// ClientClient is the client interface for the Client service. +type ClientClient interface { + ExecCommand(ctx context.Context, in *Command, opts ...gorums.CallOption) +} + +// enforce interface compliance +var _ ClientClient = (*Configuration)(nil) + +// ClientNodeClient is the single node client interface for the Client service. +type ClientNodeClient interface { + CommandStatus(ctx context.Context, in *Command) (resp *CommandStatusResponse, err error) +} + +// enforce interface compliance +var _ ClientNodeClient = (*Node)(nil) + +// Reference imports to suppress errors if they are not otherwise used. +var _ empty.Empty + // ExecCommand sends a command to all replicas and waits for valid signatures // from f+1 replicas -func (c *Configuration) ExecCommand(ctx context.Context, in *Command) *AsyncEmpty { +func (c *Configuration) ExecCommand(ctx context.Context, in *Command, opts ...gorums.CallOption) { cd := gorums.QuorumCallData{ Message: in, Method: "clientpb.Client.ExecCommand", } - cd.QuorumFunction = func(req proto.Message, replies map[uint32]proto.Message) (proto.Message, bool) { - r := make(map[uint32]*emptypb.Empty, len(replies)) - for k, v := range replies { - r[k] = v.(*emptypb.Empty) - } - return c.qspec.ExecCommandQF(req.(*Command), r) - } - fut := c.RawConfiguration.AsyncCall(ctx, cd) - return &AsyncEmpty{fut} -} - -// ClientClient is the client interface for the Client service. -type ClientClient interface { - ExecCommand(ctx context.Context, in *Command) *AsyncEmpty + c.RawConfiguration.Multicast(ctx, cd, opts...) } -// enforce interface compliance -var _ ClientClient = (*Configuration)(nil) +// There are no quorum calls. +type QuorumSpec interface{} -// QuorumSpec is the interface of quorum functions for Client. -type QuorumSpec interface { - gorums.ConfigOption +// CommandStatus is a quorum call invoked on all nodes in configuration c, +// with the same argument in, and returns a combined result. +func (n *Node) CommandStatus(ctx context.Context, in *Command) (resp *CommandStatusResponse, err error) { + cd := gorums.CallData{ + Message: in, + Method: "clientpb.Client.CommandStatus", + } - // ExecCommandQF is the quorum function for the ExecCommand - // asynchronous quorum call method. The in parameter is the request object - // supplied to the ExecCommand method at call time, and may or may not - // be used by the quorum function. If the in parameter is not needed - // you should implement your quorum function with '_ *Command'. - ExecCommandQF(in *Command, replies map[uint32]*emptypb.Empty) (*emptypb.Empty, bool) + res, err := n.RawNode.RPCCall(ctx, cd) + if err != nil { + return nil, err + } + return res.(*CommandStatusResponse), err } // Client is the server-side API for the Client Service type ClientServer interface { - ExecCommand(ctx gorums.ServerCtx, request *Command) (response *emptypb.Empty, err error) + ExecCommand(ctx gorums.ServerCtx, request *Command) + CommandStatus(ctx gorums.ServerCtx, request *Command) (response *CommandStatusResponse, err error) } func RegisterClientServer(srv *gorums.Server, impl ClientServer) { srv.RegisterHandler("clientpb.Client.ExecCommand", func(ctx gorums.ServerCtx, in *gorums.Message) (*gorums.Message, error) { req := gorums.AsProto[*Command](in) - resp, err := impl.ExecCommand(ctx, req) + impl.ExecCommand(ctx, req) + return nil, nil + }) + srv.RegisterHandler("clientpb.Client.CommandStatus", func(ctx gorums.ServerCtx, in *gorums.Message) (*gorums.Message, error) { + req := gorums.AsProto[*Command](in) + resp, err := impl.CommandStatus(ctx, req) return gorums.NewResponseMessage(in.GetMetadata(), resp), err }) } - -type internalEmpty struct { - nid uint32 - reply *emptypb.Empty - err error -} - -// AsyncEmpty is a async object for processing replies. -type AsyncEmpty struct { - *gorums.Async -} - -// Get returns the reply and any error associated with the called method. -// The method blocks until a reply or error is available. -func (f *AsyncEmpty) Get() (*emptypb.Empty, error) { - resp, err := f.Async.Get() - if err != nil { - return nil, err - } - return resp.(*emptypb.Empty), err -} diff --git a/internal/proto/hotstuffpb/hotstuff.pb.go b/internal/proto/hotstuffpb/hotstuff.pb.go index dad5fc927..b63c1f474 100644 --- a/internal/proto/hotstuffpb/hotstuff.pb.go +++ b/internal/proto/hotstuffpb/hotstuff.pb.go @@ -7,12 +7,12 @@ package hotstuffpb import ( + empty "github.com/golang/protobuf/ptypes/empty" + timestamp "github.com/golang/protobuf/ptypes/timestamp" _ "github.com/relab/gorums" clientpb "github.com/relab/hotstuff/internal/proto/clientpb" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" - emptypb "google.golang.org/protobuf/types/known/emptypb" - timestamppb "google.golang.org/protobuf/types/known/timestamppb" reflect "reflect" sync "sync" unsafe "unsafe" @@ -128,7 +128,7 @@ type Block struct { View uint64 `protobuf:"varint,3,opt,name=View,proto3" json:"View,omitempty"` Commands *clientpb.Batch `protobuf:"bytes,4,opt,name=Commands,proto3" json:"Commands,omitempty"` Proposer uint32 `protobuf:"varint,5,opt,name=Proposer,proto3" json:"Proposer,omitempty"` - Timestamp *timestamppb.Timestamp `protobuf:"bytes,6,opt,name=Timestamp,proto3" json:"Timestamp,omitempty"` + Timestamp *timestamp.Timestamp `protobuf:"bytes,6,opt,name=Timestamp,proto3" json:"Timestamp,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -198,7 +198,7 @@ func (x *Block) GetProposer() uint32 { return 0 } -func (x *Block) GetTimestamp() *timestamppb.Timestamp { +func (x *Block) GetTimestamp() *timestamp.Timestamp { if x != nil { return x.Timestamp } @@ -1152,8 +1152,8 @@ var file_internal_proto_hotstuffpb_hotstuff_proto_goTypes = []any{ (*AggQC)(nil), // 16: hotstuffpb.AggQC nil, // 17: hotstuffpb.AggQC.QCsEntry (*clientpb.Batch)(nil), // 18: clientpb.Batch - (*timestamppb.Timestamp)(nil), // 19: google.protobuf.Timestamp - (*emptypb.Empty)(nil), // 20: google.protobuf.Empty + (*timestamp.Timestamp)(nil), // 19: google.protobuf.Timestamp + (*empty.Empty)(nil), // 20: google.protobuf.Empty } var file_internal_proto_hotstuffpb_hotstuff_proto_depIdxs = []int32{ 2, // 0: hotstuffpb.Proposal.Block:type_name -> hotstuffpb.Block diff --git a/internal/proto/hotstuffpb/hotstuff_gorums.pb.go b/internal/proto/hotstuffpb/hotstuff_gorums.pb.go index f0545f18e..efcaad5c9 100644 --- a/internal/proto/hotstuffpb/hotstuff_gorums.pb.go +++ b/internal/proto/hotstuffpb/hotstuff_gorums.pb.go @@ -9,10 +9,10 @@ package hotstuffpb import ( context "context" fmt "fmt" + empty "github.com/golang/protobuf/ptypes/empty" gorums "github.com/relab/gorums" encoding "google.golang.org/grpc/encoding" proto "google.golang.org/protobuf/proto" - emptypb "google.golang.org/protobuf/types/known/emptypb" ) const ( @@ -170,7 +170,7 @@ type ConsensusNodeClient interface { var _ ConsensusNodeClient = (*Node)(nil) // Reference imports to suppress errors if they are not otherwise used. -var _ emptypb.Empty +var _ empty.Empty // Propose is a quorum call invoked on all nodes in configuration c, // with the same argument in, and returns a combined result. @@ -184,7 +184,7 @@ func (c *Configuration) Propose(ctx context.Context, in *Proposal, opts ...gorum } // Reference imports to suppress errors if they are not otherwise used. -var _ emptypb.Empty +var _ empty.Empty // Timeout is a quorum call invoked on all nodes in configuration c, // with the same argument in, and returns a combined result. @@ -275,7 +275,7 @@ type internalBlock struct { } // Reference imports to suppress errors if they are not otherwise used. -var _ emptypb.Empty +var _ empty.Empty // Vote is a quorum call invoked on all nodes in configuration c, // with the same argument in, and returns a combined result. @@ -289,7 +289,7 @@ func (n *Node) Vote(ctx context.Context, in *PartialCert, opts ...gorums.CallOpt } // Reference imports to suppress errors if they are not otherwise used. -var _ emptypb.Empty +var _ empty.Empty // NewView is a quorum call invoked on all nodes in configuration c, // with the same argument in, and returns a combined result. diff --git a/server/clientio.go b/server/clientio.go index 8f88871e6..710fe6d8e 100644 --- a/server/clientio.go +++ b/server/clientio.go @@ -7,14 +7,121 @@ import ( "sync" "github.com/relab/gorums" + "github.com/relab/hotstuff" "github.com/relab/hotstuff/core/eventloop" "github.com/relab/hotstuff/core/logging" "github.com/relab/hotstuff/internal/proto/clientpb" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - "google.golang.org/protobuf/types/known/emptypb" ) +// clientStatusWindow tracks command statuses for a single client using an array-based approach. +// This is more memory-efficient than nested maps, especially for increasing sequence numbers. +type clientStatusWindow struct { + baseSeqNum uint64 // starting sequence number for the current window + statuses []hotstuff.CommandStatus // array indexed by (seqNum - baseSeqNum) +} + +// CommandStatusTracker efficiently tracks command status per client per sequence number. +// Uses array-based storage per client to avoid the overhead of nested maps. +// Supports sliding window cleanup to prevent unbounded memory growth. +type CommandStatusTracker struct { + // clientWindows maps ClientID -> client status window + clientWindows map[uint32]*clientStatusWindow +} + +// NewCommandStatusTracker creates a new status tracker +func NewCommandStatusTracker() *CommandStatusTracker { + return &CommandStatusTracker{ + clientWindows: make(map[uint32]*clientStatusWindow), + } +} + +// SetStatus sets the status of a command +func (cst *CommandStatusTracker) SetStatus(clientID uint32, seqNum uint64, status hotstuff.CommandStatus) { + window, exists := cst.clientWindows[clientID] + if !exists { + // Initialize new window for this client, starting at this sequence number + window = &clientStatusWindow{ + baseSeqNum: seqNum, + statuses: make([]hotstuff.CommandStatus, 5000), + } + cst.clientWindows[clientID] = window + window.statuses[0] = status + return + } + + // Check if seqNum is within current window + if seqNum >= window.baseSeqNum { + index := seqNum - window.baseSeqNum + // Extend array if necessary + if index >= uint64(len(window.statuses)) { + // Grow array by 50% or enough to fit the new index, whichever is larger + newLen := len(window.statuses) + len(window.statuses)/2 + 1 + if int(index) >= newLen { + newLen = int(index) + 1 + } + newStatuses := make([]hotstuff.CommandStatus, newLen) + copy(newStatuses, window.statuses) + window.statuses = newStatuses + } + window.statuses[index] = status + } + // Ignore updates for seqNum < baseSeqNum (already cleaned up) +} + +// GetStatus retrieves the status of a command. Returns StatusExecuted if not found. +func (cst *CommandStatusTracker) GetStatus(clientID uint32, seqNum uint64) hotstuff.CommandStatus { + window, exists := cst.clientWindows[clientID] + if !exists { + return hotstuff.EXECUTED + } + + // Check if seqNum is within current window + if seqNum >= window.baseSeqNum && seqNum < window.baseSeqNum+uint64(len(window.statuses)) { + index := seqNum - window.baseSeqNum + return window.statuses[index] + } + + // If outside window (cleaned up or not yet added), assume executed + return hotstuff.EXECUTED +} + +// Cleanup removes entries for sequence numbers less than or equal to the given threshold per client. +// This prevents unbounded memory growth by sliding the window forward. +func (cst *CommandStatusTracker) Cleanup(clientID uint32, upToSeqNum uint64) { + window, exists := cst.clientWindows[clientID] + if !exists { + return + } + + // Calculate how many entries to remove from the front + if upToSeqNum >= window.baseSeqNum { + entriesToRemove := int(upToSeqNum - window.baseSeqNum + 1) + if entriesToRemove >= len(window.statuses) { + // Remove entire window, clean up the client entry + delete(cst.clientWindows, clientID) + return + } + + // Slide the window forward + window.statuses = window.statuses[entriesToRemove:] + window.baseSeqNum = upToSeqNum + 1 + } +} + +// GetClientStatuses returns a snapshot of all statuses for a given client (for testing/debugging) +func (cst *CommandStatusTracker) GetClientStatuses(clientID uint32) map[uint64]hotstuff.CommandStatus { + window, exists := cst.clientWindows[clientID] + if !exists { + return make(map[uint64]hotstuff.CommandStatus) + } + + snapshot := make(map[uint64]hotstuff.CommandStatus, len(window.statuses)) + for i, status := range window.statuses { + snapshot[window.baseSeqNum+uint64(i)] = status + } + return snapshot +} + // ClientIO serves a client. type ClientIO struct { logger logging.Logger @@ -26,7 +133,9 @@ type ClientIO struct { hash hash.Hash cmdCount uint32 - lastExecutedSeqNum map[uint32]uint64 // highest executed sequence number per client ID + lastExecutedSeqNum map[uint32]uint64 // highest executed sequence number per client ID + statusTracker *CommandStatusTracker // tracks status of all commands (executed/aborted/failed) + } // NewClientIO returns a new client IO server. @@ -40,10 +149,10 @@ func NewClientIO( logger: logger, cmdCache: cmdCache, - awaitingCmds: make(map[clientpb.MessageID]chan<- error), srv: gorums.NewServer(srvOpts...), hash: sha256.New(), lastExecutedSeqNum: make(map[uint32]uint64), + statusTracker: NewCommandStatusTracker(), } clientpb.RegisterClientServer(srv.srv, srv) eventloop.Register(el, func(event clientpb.ExecuteEvent) { @@ -76,35 +185,26 @@ func (srv *ClientIO) CmdCount() uint32 { return srv.cmdCount } -func (srv *ClientIO) ExecCommand(ctx gorums.ServerCtx, cmd *clientpb.Command) (*emptypb.Empty, error) { - id := cmd.ID() - errChan := make(chan error) - - srv.mut.Lock() - srv.awaitingCmds[id] = errChan - srv.mut.Unlock() - +func (srv *ClientIO) ExecCommand(ctx gorums.ServerCtx, cmd *clientpb.Command) { srv.cmdCache.Add(cmd) + srv.statusTracker.SetStatus(cmd.ClientID, cmd.SequenceNumber, hotstuff.UNKNOWN) ctx.Release() - err := <-errChan - return &emptypb.Empty{}, err } func (srv *ClientIO) Exec(batch *clientpb.Batch) { for _, cmd := range batch.GetCommands() { - id := cmd.ID() srv.mut.Lock() if srv.isDuplicate(cmd) { srv.logger.Info("duplicate command found") - srv.completeCommand(id, status.Error(codes.Aborted, "command already executed")) srv.mut.Unlock() continue } srv.lastExecutedSeqNum[cmd.ClientID] = cmd.SequenceNumber + // Mark command as executed in status tracker + srv.statusTracker.SetStatus(cmd.ClientID, cmd.SequenceNumber, hotstuff.EXECUTED) _, _ = srv.hash.Write(cmd.Data) srv.cmdCount++ - srv.completeCommand(id, nil) srv.mut.Unlock() } srv.logger.Debugf("Hash: %.8x", srv.hash.Sum(nil)) @@ -113,7 +213,8 @@ func (srv *ClientIO) Exec(batch *clientpb.Batch) { func (srv *ClientIO) Abort(batch *clientpb.Batch) { for _, cmd := range batch.GetCommands() { srv.mut.Lock() - srv.completeCommand(cmd.ID(), status.Error(codes.Aborted, "blockchain was forked")) + // Mark command as aborted in status tracker + srv.statusTracker.SetStatus(cmd.ClientID, cmd.SequenceNumber, hotstuff.FAILED) srv.mut.Unlock() } } @@ -125,11 +226,21 @@ func (srv *ClientIO) isDuplicate(cmd *clientpb.Command) bool { return ok && seqNum >= cmd.SequenceNumber } -// completeCommand sends an error or nil to the awaiting client's error channel. -// The caller must hold srv.mut.Lock(). -func (srv *ClientIO) completeCommand(id clientpb.MessageID, err error) { - if errChan, ok := srv.awaitingCmds[id]; ok { - errChan <- err - delete(srv.awaitingCmds, id) - } +// CleanupOldStatuses removes command status entries that are older than the given sequence number +// for a specific client. This should be called periodically to prevent unbounded memory growth. +func (srv *ClientIO) CleanupOldStatuses(clientID uint32, upToSeqNum uint64) { + srv.mut.Lock() + defer srv.mut.Unlock() + srv.statusTracker.Cleanup(clientID, upToSeqNum) +} + +func (srv *ClientIO) CommandStatus(ctx gorums.ServerCtx, in *clientpb.Command) (resp *clientpb.CommandStatusResponse, err error) { + srv.mut.Lock() + defer srv.mut.Unlock() + status := srv.statusTracker.GetStatus(in.ClientID, in.SequenceNumber) + srv.logger.Infof("Received CommandStatus request (client: %d, sequence: %d, status: %d)", in.ClientID, in.SequenceNumber, status) + return &clientpb.CommandStatusResponse{ + Status: clientpb.CommandStatusResponse_Status(status), + Command: in, + }, nil } diff --git a/types.go b/types.go index e3d78fab8..bae2baa35 100644 --- a/types.go +++ b/types.go @@ -373,3 +373,13 @@ type ReplicaInfo struct { Location string Metadata map[string]string } + +type CommandStatus uint8 + +const ( + UNKNOWN CommandStatus = iota + PENDING + COMMITTED + EXECUTED + FAILED +) From 90c62372eb238bd62aee440129217dfa2d2b54f4 Mon Sep 17 00:00:00 2001 From: hanish520 Date: Mon, 12 Jan 2026 23:42:08 -0800 Subject: [PATCH 2/5] changed the commandstatus to quorum call --- client/client.go | 40 +++++++++++++------- internal/proto/clientpb/client.pb.go | 6 +-- internal/proto/clientpb/client.proto | 1 + internal/proto/clientpb/client_gorums.pb.go | 42 ++++++++++++++------- 4 files changed, 60 insertions(+), 29 deletions(-) diff --git a/client/client.go b/client/client.go index 7ff24768a..750968c00 100644 --- a/client/client.go +++ b/client/client.go @@ -9,7 +9,6 @@ import ( "errors" "io" "math" - "math/rand" "strings" "sync" "time" @@ -23,7 +22,6 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" - "google.golang.org/protobuf/types/known/emptypb" ) // ID is the identifier for a client. @@ -33,11 +31,32 @@ type qspec struct { faulty int } -func (q *qspec) ExecCommandQF(_ *clientpb.Command, signatures map[uint32]*emptypb.Empty) (*emptypb.Empty, bool) { - if len(signatures) < q.faulty+1 { +func (q *qspec) CommandStatusQF(command *clientpb.Command, replies map[uint32]*clientpb.CommandStatusResponse) (*clientpb.CommandStatusResponse, bool) { + if len(replies) < q.faulty+1 { return nil, false } - return &emptypb.Empty{}, true + responseCount := make([]int, 4) // assuming 4 possible statuses + + for _, resp := range replies { + if resp != nil { + status := resp.Status + if int(status) >= 0 && int(status) < len(responseCount) { + responseCount[int(status)]++ + } + } + } + for status, count := range responseCount { + if count >= q.faulty+1 { + return &clientpb.CommandStatusResponse{ + Command: command, + Status: clientpb.CommandStatusResponse_Status(status), + }, true + } + } + return &clientpb.CommandStatusResponse{ + Command: command, + Status: clientpb.CommandStatusResponse_PENDING, + }, false } type pendingCmd struct { @@ -126,6 +145,7 @@ func (c *Client) Connect(replicas []hotstuff.ReplicaInfo) (err error) { } c.gorumsConfig, err = c.mgr.NewConfiguration(&qspec{faulty: hotstuff.NumFaulty(len(replicas))}, gorums.WithNodeMap(nodes)) if err != nil { + c.logger.Error("unable to create the configuration in client") c.mgr.Close() return err } @@ -273,14 +293,8 @@ func (c *Client) fetchCommandStatus(sequenceNumber uint64) hotstuff.CommandStatu ClientID: uint32(c.id), SequenceNumber: sequenceNumber, } - nodes := c.gorumsConfig.Nodes() - if len(nodes) == 0 { - c.logger.Error("No nodes available in gorums config") - cancel() - continue - } - node := nodes[rand.Intn(len(nodes))] - response, err := node.CommandStatus(ctx, cmd) + + response, err := c.gorumsConfig.CommandStatus(ctx, cmd) cancel() if err != nil { diff --git a/internal/proto/clientpb/client.pb.go b/internal/proto/clientpb/client.pb.go index ba8b7d1c4..01be25d45 100644 --- a/internal/proto/clientpb/client.pb.go +++ b/internal/proto/clientpb/client.pb.go @@ -257,10 +257,10 @@ const file_internal_proto_clientpb_client_proto_rawDesc = "" + "\x0eSequenceNumber\x18\x02 \x01(\x04R\x0eSequenceNumber\x12\x12\n" + "\x04Data\x18\x03 \x01(\fR\x04Data\"6\n" + "\x05Batch\x12-\n" + - "\bCommands\x18\x01 \x03(\v2\x11.clientpb.CommandR\bCommands2\x8f\x01\n" + + "\bCommands\x18\x01 \x03(\v2\x11.clientpb.CommandR\bCommands2\x93\x01\n" + "\x06Client\x12>\n" + - "\vExecCommand\x12\x11.clientpb.Command\x1a\x16.google.protobuf.Empty\"\x04\x98\xb5\x18\x01\x12E\n" + - "\rCommandStatus\x12\x11.clientpb.Command\x1a\x1f.clientpb.CommandStatusResponse\"\x00B3Z1github.com/relab/hotstuff/internal/proto/clientpbb\x06proto3" + "\vExecCommand\x12\x11.clientpb.Command\x1a\x16.google.protobuf.Empty\"\x04\x98\xb5\x18\x01\x12I\n" + + "\rCommandStatus\x12\x11.clientpb.Command\x1a\x1f.clientpb.CommandStatusResponse\"\x04\xa0\xb5\x18\x01B3Z1github.com/relab/hotstuff/internal/proto/clientpbb\x06proto3" var ( file_internal_proto_clientpb_client_proto_rawDescOnce sync.Once diff --git a/internal/proto/clientpb/client.proto b/internal/proto/clientpb/client.proto index 59ed1e90a..6b3258e68 100644 --- a/internal/proto/clientpb/client.proto +++ b/internal/proto/clientpb/client.proto @@ -16,6 +16,7 @@ service Client { } rpc CommandStatus(Command) returns (CommandStatusResponse) { + option (gorums.quorumcall) = true; } } diff --git a/internal/proto/clientpb/client_gorums.pb.go b/internal/proto/clientpb/client_gorums.pb.go index baaa64c51..5d02ee5ec 100644 --- a/internal/proto/clientpb/client_gorums.pb.go +++ b/internal/proto/clientpb/client_gorums.pb.go @@ -12,6 +12,7 @@ import ( empty "github.com/golang/protobuf/ptypes/empty" gorums "github.com/relab/gorums" encoding "google.golang.org/grpc/encoding" + proto "google.golang.org/protobuf/proto" ) const ( @@ -152,18 +153,11 @@ type Node struct { // ClientClient is the client interface for the Client service. type ClientClient interface { ExecCommand(ctx context.Context, in *Command, opts ...gorums.CallOption) -} - -// enforce interface compliance -var _ ClientClient = (*Configuration)(nil) - -// ClientNodeClient is the single node client interface for the Client service. -type ClientNodeClient interface { CommandStatus(ctx context.Context, in *Command) (resp *CommandStatusResponse, err error) } // enforce interface compliance -var _ ClientNodeClient = (*Node)(nil) +var _ ClientClient = (*Configuration)(nil) // Reference imports to suppress errors if they are not otherwise used. var _ empty.Empty @@ -179,18 +173,34 @@ func (c *Configuration) ExecCommand(ctx context.Context, in *Command, opts ...go c.RawConfiguration.Multicast(ctx, cd, opts...) } -// There are no quorum calls. -type QuorumSpec interface{} +// QuorumSpec is the interface of quorum functions for Client. +type QuorumSpec interface { + gorums.ConfigOption + + // CommandStatusQF is the quorum function for the CommandStatus + // quorum call method. The in parameter is the request object + // supplied to the CommandStatus method at call time, and may or may not + // be used by the quorum function. If the in parameter is not needed + // you should implement your quorum function with '_ *Command'. + CommandStatusQF(in *Command, replies map[uint32]*CommandStatusResponse) (*CommandStatusResponse, bool) +} // CommandStatus is a quorum call invoked on all nodes in configuration c, // with the same argument in, and returns a combined result. -func (n *Node) CommandStatus(ctx context.Context, in *Command) (resp *CommandStatusResponse, err error) { - cd := gorums.CallData{ +func (c *Configuration) CommandStatus(ctx context.Context, in *Command) (resp *CommandStatusResponse, err error) { + cd := gorums.QuorumCallData{ Message: in, Method: "clientpb.Client.CommandStatus", } + cd.QuorumFunction = func(req proto.Message, replies map[uint32]proto.Message) (proto.Message, bool) { + r := make(map[uint32]*CommandStatusResponse, len(replies)) + for k, v := range replies { + r[k] = v.(*CommandStatusResponse) + } + return c.qspec.CommandStatusQF(req.(*Command), r) + } - res, err := n.RawNode.RPCCall(ctx, cd) + res, err := c.RawConfiguration.QuorumCall(ctx, cd) if err != nil { return nil, err } @@ -215,3 +225,9 @@ func RegisterClientServer(srv *gorums.Server, impl ClientServer) { return gorums.NewResponseMessage(in.GetMetadata(), resp), err }) } + +type internalCommandStatusResponse struct { + nid uint32 + reply *CommandStatusResponse + err error +} From 659872d3629838cc6dd28f1c6ee1144c1f114808 Mon Sep 17 00:00:00 2001 From: hanish520 Date: Thu, 15 Jan 2026 23:36:44 -0800 Subject: [PATCH 3/5] added tests for command status tracker --- server/clientio.go | 11 ++-- server/clientio_test.go | 125 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 130 insertions(+), 6 deletions(-) create mode 100644 server/clientio_test.go diff --git a/server/clientio.go b/server/clientio.go index 710fe6d8e..113872297 100644 --- a/server/clientio.go +++ b/server/clientio.go @@ -127,11 +127,10 @@ type ClientIO struct { logger logging.Logger cmdCache *clientpb.CommandCache - mut sync.Mutex - srv *gorums.Server - awaitingCmds map[clientpb.MessageID]chan<- error - hash hash.Hash - cmdCount uint32 + mut sync.Mutex + srv *gorums.Server + hash hash.Hash + cmdCount uint32 lastExecutedSeqNum map[uint32]uint64 // highest executed sequence number per client ID statusTracker *CommandStatusTracker // tracks status of all commands (executed/aborted/failed) @@ -234,7 +233,7 @@ func (srv *ClientIO) CleanupOldStatuses(clientID uint32, upToSeqNum uint64) { srv.statusTracker.Cleanup(clientID, upToSeqNum) } -func (srv *ClientIO) CommandStatus(ctx gorums.ServerCtx, in *clientpb.Command) (resp *clientpb.CommandStatusResponse, err error) { +func (srv *ClientIO) CommandStatus(_ gorums.ServerCtx, in *clientpb.Command) (resp *clientpb.CommandStatusResponse, err error) { srv.mut.Lock() defer srv.mut.Unlock() status := srv.statusTracker.GetStatus(in.ClientID, in.SequenceNumber) diff --git a/server/clientio_test.go b/server/clientio_test.go new file mode 100644 index 000000000..1b9254409 --- /dev/null +++ b/server/clientio_test.go @@ -0,0 +1,125 @@ +package server + +import ( + "testing" + + "github.com/relab/hotstuff" +) + +func TestCommandStatusTracker_GetStatus_DefaultExecuted(t *testing.T) { + tr := NewCommandStatusTracker() + status := tr.GetStatus(1, 1) + if status != hotstuff.EXECUTED { + t.Fatalf("expected default status EXECUTED for unknown client, got %v", status) + } +} + +func TestCommandStatusTracker_SetAndGet(t *testing.T) { + tr := NewCommandStatusTracker() + clientID := uint32(1) + seq := uint64(10) + + tr.SetStatus(clientID, seq, hotstuff.UNKNOWN) + if got := tr.GetStatus(clientID, seq); got != hotstuff.UNKNOWN { + t.Fatalf("GetStatus = %v; want %v", got, hotstuff.UNKNOWN) + } +} + +func TestCommandStatusTracker_ExtendWindow(t *testing.T) { + tr := NewCommandStatusTracker() + clientID := uint32(2) + + // initial set at seq 1 + tr.SetStatus(clientID, 1, hotstuff.UNKNOWN) + if got := tr.GetStatus(clientID, 1); got != hotstuff.UNKNOWN { + t.Fatalf("initial GetStatus = %v; want %v", got, hotstuff.UNKNOWN) + } + + // set far ahead to force window growth + largeSeq := uint64(6000) + tr.SetStatus(clientID, largeSeq, hotstuff.FAILED) + if got := tr.GetStatus(clientID, largeSeq); got != hotstuff.FAILED { + t.Fatalf("after extend GetStatus(%d) = %v; want %v", largeSeq, got, hotstuff.FAILED) + } + + // earlier entry should still be present + if got := tr.GetStatus(clientID, 1); got != hotstuff.UNKNOWN { + t.Fatalf("after extend GetStatus(1) = %v; want %v", got, hotstuff.UNKNOWN) + } +} + +func TestCommandStatusTracker_CleanupSlidesAndDeletes(t *testing.T) { + tr := NewCommandStatusTracker() + clientID := uint32(3) + + // set statuses for seqs 1..10 + for i := uint64(1); i <= 10; i++ { + tr.SetStatus(clientID, i, hotstuff.UNKNOWN) + } + + // cleanup up to 4 -> base should become 5, entries 1..4 removed + tr.Cleanup(clientID, 4) + // seq 3 should now be treated as executed (cleaned up) + if got := tr.GetStatus(clientID, 3); got != hotstuff.EXECUTED { + t.Fatalf("after cleanup GetStatus(3) = %v; want EXECUTED", got) + } + // seq 5 should still be UNKNOWN + if got := tr.GetStatus(clientID, 5); got != hotstuff.UNKNOWN { + t.Fatalf("after cleanup GetStatus(5) = %v; want %v", got, hotstuff.UNKNOWN) + } + + // cleanup up to 10 -> should remove entire window + tr.Cleanup(clientID, 10) + if got := tr.GetStatus(clientID, 5); got != hotstuff.EXECUTED { + t.Fatalf("after full cleanup GetStatus(5) = %v; want EXECUTED", got) + } + // snapshot should be empty for this client + // snapshot should not contain cleaned-up seqs 1..10 + if m := tr.GetClientStatuses(clientID); len(m) != 0 { + for i := uint64(1); i <= 10; i++ { + if _, ok := m[i]; ok { + t.Fatalf("snapshot contains cleaned-up seq %d", i) + } + } + // It's an implementation detail whether the snapshot is empty or contains + // preallocated entries for sequences > cleanup point (which is why len(m) + // may be 4990). We only assert here that cleaned-up sequences are gone. + } +} + +func TestCommandStatusTracker_IgnoreOldSetStatus(t *testing.T) { + tr := NewCommandStatusTracker() + clientID := uint32(4) + + // create window at seq 100 + tr.SetStatus(clientID, 100, hotstuff.UNKNOWN) + // attempt to set older seq 90 -> should be ignored + tr.SetStatus(clientID, 90, hotstuff.FAILED) + if got := tr.GetStatus(clientID, 90); got != hotstuff.EXECUTED { + t.Fatalf("old SetStatus should be ignored; GetStatus(90) = %v; want EXECUTED", got) + } + // newer seq should still be present + if got := tr.GetStatus(clientID, 100); got != hotstuff.UNKNOWN { + t.Fatalf("GetStatus(100) = %v; want %v", got, hotstuff.UNKNOWN) + } +} + +func TestCommandStatusTracker_GetClientStatusesSnapshot(t *testing.T) { + tr := NewCommandStatusTracker() + clientID := uint32(5) + + tr.SetStatus(clientID, 50, hotstuff.UNKNOWN) + tr.SetStatus(clientID, 52, hotstuff.FAILED) + + snap := tr.GetClientStatuses(clientID) + if snap[50] != hotstuff.UNKNOWN { + t.Fatalf("snapshot[50] = %v; want %v", snap[50], hotstuff.UNKNOWN) + } + if snap[52] != hotstuff.FAILED { + t.Fatalf("snapshot[52] = %v; want %v", snap[52], hotstuff.FAILED) + } + // a sequence not set but within window will have zero value; ensure an unrelated seq returns EXECUTED via GetStatus + if got := tr.GetStatus(clientID, 49); got != hotstuff.EXECUTED { + t.Fatalf("GetStatus(49) = %v; want EXECUTED", got) + } +} From 3482caa9a9ce7dacdec0480aba3b604ede7a2046 Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Wed, 28 Jan 2026 20:42:30 +0100 Subject: [PATCH 4/5] chore: go mod tidy and regenerate proto files --- go.mod | 1 - internal/proto/clientpb/client.pb.go | 4 ++-- internal/proto/clientpb/client_gorums.pb.go | 4 ++-- internal/proto/hotstuffpb/hotstuff.pb.go | 12 ++++++------ internal/proto/hotstuffpb/hotstuff_gorums.pb.go | 10 +++++----- 5 files changed, 15 insertions(+), 16 deletions(-) diff --git a/go.mod b/go.mod index 664d5d6b8..43264eae6 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,6 @@ go 1.25.6 require ( cuelang.org/go v0.15.4 github.com/felixge/fgprof v0.9.5 - github.com/golang/protobuf v1.5.4 github.com/google/go-cmp v0.7.0 github.com/kilic/bls12-381 v0.1.1-0.20210208205449-6045b0235e36 github.com/mroth/weightedrand v1.0.0 diff --git a/internal/proto/clientpb/client.pb.go b/internal/proto/clientpb/client.pb.go index 01be25d45..6c650f076 100644 --- a/internal/proto/clientpb/client.pb.go +++ b/internal/proto/clientpb/client.pb.go @@ -7,10 +7,10 @@ package clientpb import ( - empty "github.com/golang/protobuf/ptypes/empty" _ "github.com/relab/gorums" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" + emptypb "google.golang.org/protobuf/types/known/emptypb" reflect "reflect" sync "sync" unsafe "unsafe" @@ -281,7 +281,7 @@ var file_internal_proto_clientpb_client_proto_goTypes = []any{ (*CommandStatusResponse)(nil), // 1: clientpb.CommandStatusResponse (*Command)(nil), // 2: clientpb.Command (*Batch)(nil), // 3: clientpb.Batch - (*empty.Empty)(nil), // 4: google.protobuf.Empty + (*emptypb.Empty)(nil), // 4: google.protobuf.Empty } var file_internal_proto_clientpb_client_proto_depIdxs = []int32{ 0, // 0: clientpb.CommandStatusResponse.status:type_name -> clientpb.CommandStatusResponse.Status diff --git a/internal/proto/clientpb/client_gorums.pb.go b/internal/proto/clientpb/client_gorums.pb.go index 5d02ee5ec..532a43fa7 100644 --- a/internal/proto/clientpb/client_gorums.pb.go +++ b/internal/proto/clientpb/client_gorums.pb.go @@ -9,10 +9,10 @@ package clientpb import ( context "context" fmt "fmt" - empty "github.com/golang/protobuf/ptypes/empty" gorums "github.com/relab/gorums" encoding "google.golang.org/grpc/encoding" proto "google.golang.org/protobuf/proto" + emptypb "google.golang.org/protobuf/types/known/emptypb" ) const ( @@ -160,7 +160,7 @@ type ClientClient interface { var _ ClientClient = (*Configuration)(nil) // Reference imports to suppress errors if they are not otherwise used. -var _ empty.Empty +var _ emptypb.Empty // ExecCommand sends a command to all replicas and waits for valid signatures // from f+1 replicas diff --git a/internal/proto/hotstuffpb/hotstuff.pb.go b/internal/proto/hotstuffpb/hotstuff.pb.go index b63c1f474..dad5fc927 100644 --- a/internal/proto/hotstuffpb/hotstuff.pb.go +++ b/internal/proto/hotstuffpb/hotstuff.pb.go @@ -7,12 +7,12 @@ package hotstuffpb import ( - empty "github.com/golang/protobuf/ptypes/empty" - timestamp "github.com/golang/protobuf/ptypes/timestamp" _ "github.com/relab/gorums" clientpb "github.com/relab/hotstuff/internal/proto/clientpb" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" + emptypb "google.golang.org/protobuf/types/known/emptypb" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" reflect "reflect" sync "sync" unsafe "unsafe" @@ -128,7 +128,7 @@ type Block struct { View uint64 `protobuf:"varint,3,opt,name=View,proto3" json:"View,omitempty"` Commands *clientpb.Batch `protobuf:"bytes,4,opt,name=Commands,proto3" json:"Commands,omitempty"` Proposer uint32 `protobuf:"varint,5,opt,name=Proposer,proto3" json:"Proposer,omitempty"` - Timestamp *timestamp.Timestamp `protobuf:"bytes,6,opt,name=Timestamp,proto3" json:"Timestamp,omitempty"` + Timestamp *timestamppb.Timestamp `protobuf:"bytes,6,opt,name=Timestamp,proto3" json:"Timestamp,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -198,7 +198,7 @@ func (x *Block) GetProposer() uint32 { return 0 } -func (x *Block) GetTimestamp() *timestamp.Timestamp { +func (x *Block) GetTimestamp() *timestamppb.Timestamp { if x != nil { return x.Timestamp } @@ -1152,8 +1152,8 @@ var file_internal_proto_hotstuffpb_hotstuff_proto_goTypes = []any{ (*AggQC)(nil), // 16: hotstuffpb.AggQC nil, // 17: hotstuffpb.AggQC.QCsEntry (*clientpb.Batch)(nil), // 18: clientpb.Batch - (*timestamp.Timestamp)(nil), // 19: google.protobuf.Timestamp - (*empty.Empty)(nil), // 20: google.protobuf.Empty + (*timestamppb.Timestamp)(nil), // 19: google.protobuf.Timestamp + (*emptypb.Empty)(nil), // 20: google.protobuf.Empty } var file_internal_proto_hotstuffpb_hotstuff_proto_depIdxs = []int32{ 2, // 0: hotstuffpb.Proposal.Block:type_name -> hotstuffpb.Block diff --git a/internal/proto/hotstuffpb/hotstuff_gorums.pb.go b/internal/proto/hotstuffpb/hotstuff_gorums.pb.go index efcaad5c9..f0545f18e 100644 --- a/internal/proto/hotstuffpb/hotstuff_gorums.pb.go +++ b/internal/proto/hotstuffpb/hotstuff_gorums.pb.go @@ -9,10 +9,10 @@ package hotstuffpb import ( context "context" fmt "fmt" - empty "github.com/golang/protobuf/ptypes/empty" gorums "github.com/relab/gorums" encoding "google.golang.org/grpc/encoding" proto "google.golang.org/protobuf/proto" + emptypb "google.golang.org/protobuf/types/known/emptypb" ) const ( @@ -170,7 +170,7 @@ type ConsensusNodeClient interface { var _ ConsensusNodeClient = (*Node)(nil) // Reference imports to suppress errors if they are not otherwise used. -var _ empty.Empty +var _ emptypb.Empty // Propose is a quorum call invoked on all nodes in configuration c, // with the same argument in, and returns a combined result. @@ -184,7 +184,7 @@ func (c *Configuration) Propose(ctx context.Context, in *Proposal, opts ...gorum } // Reference imports to suppress errors if they are not otherwise used. -var _ empty.Empty +var _ emptypb.Empty // Timeout is a quorum call invoked on all nodes in configuration c, // with the same argument in, and returns a combined result. @@ -275,7 +275,7 @@ type internalBlock struct { } // Reference imports to suppress errors if they are not otherwise used. -var _ empty.Empty +var _ emptypb.Empty // Vote is a quorum call invoked on all nodes in configuration c, // with the same argument in, and returns a combined result. @@ -289,7 +289,7 @@ func (n *Node) Vote(ctx context.Context, in *PartialCert, opts ...gorums.CallOpt } // Reference imports to suppress errors if they are not otherwise used. -var _ empty.Empty +var _ emptypb.Empty // NewView is a quorum call invoked on all nodes in configuration c, // with the same argument in, and returns a combined result. From 38035b8a9458f19823c805a90565891b0295a272 Mon Sep 17 00:00:00 2001 From: hanish520 Date: Mon, 2 Feb 2026 00:35:30 -0800 Subject: [PATCH 5/5] Modified the client design based on the discussion(still WIP) Client latency measurement is lost with this changes Client now sends the commands and fetches the status of commands every 100ms and updates the highest committed command id and failed commands. This helps in reducing the number of RPCs made to the server for each command. Client is now sending way more commands than before need to adjust the rate limiting mechanism accordingly. --- client/client.go | 269 +++++++++++--------- internal/proto/clientpb/client.pb.go | 127 +++------ internal/proto/clientpb/client.proto | 11 +- internal/proto/clientpb/client_gorums.pb.go | 2 +- server/clientio.go | 179 +++++-------- server/clientio_test.go | 141 ++++------ types.go | 10 - 7 files changed, 303 insertions(+), 436 deletions(-) diff --git a/client/client.go b/client/client.go index 750968c00..6536b77d8 100644 --- a/client/client.go +++ b/client/client.go @@ -9,7 +9,7 @@ import ( "errors" "io" "math" - "strings" + "slices" "sync" "time" @@ -31,32 +31,65 @@ type qspec struct { faulty int } +// leastOverlapSet returns the set of uint64 values that appear in all slices. +func leastOverlapSet(slices [][]uint64) []uint64 { + if len(slices) == 0 { + return []uint64{} + } + + // Count occurrences of each value across all slices + occurrences := make(map[uint64]int) + for _, slice := range slices { + seen := make(map[uint64]bool) + for _, val := range slice { + if !seen[val] { + occurrences[val]++ + seen[val] = true + } + } + } + + // Find values that appear in all slices + result := []uint64{} + numSlices := len(slices) + for val, count := range occurrences { + if count == numSlices { + result = append(result, val) + } + } + + return result +} + func (q *qspec) CommandStatusQF(command *clientpb.Command, replies map[uint32]*clientpb.CommandStatusResponse) (*clientpb.CommandStatusResponse, bool) { if len(replies) < q.faulty+1 { return nil, false } - responseCount := make([]int, 4) // assuming 4 possible statuses - - for _, resp := range replies { - if resp != nil { - status := resp.Status - if int(status) >= 0 && int(status) < len(responseCount) { - responseCount[int(status)]++ - } + successfulHighestCmds := make([]uint64, 0, len(replies)) + commandCount := make(map[uint64]int) + failedCommandIdSets := make([][]uint64, 0, len(replies)) + for _, reply := range replies { + successfulHighestCmds = append(successfulHighestCmds, reply.GetHighestSequenceNumber()) + _, ok := commandCount[reply.GetHighestSequenceNumber()] + if !ok { + commandCount[reply.GetHighestSequenceNumber()] = 1 + continue } + commandCount[reply.GetHighestSequenceNumber()]++ + failedCommandIdSets = append(failedCommandIdSets, reply.GetFailedSequenceNumbers()) } - for status, count := range responseCount { - if count >= q.faulty+1 { + leastOverlapFailedCmds := leastOverlapSet(failedCommandIdSets) + slices.Sort(successfulHighestCmds) + slices.Reverse(successfulHighestCmds) + for _, cmd := range successfulHighestCmds { + if commandCount[cmd] >= q.faulty+1 { return &clientpb.CommandStatusResponse{ - Command: command, - Status: clientpb.CommandStatusResponse_Status(status), + HighestSequenceNumber: cmd, + FailedSequenceNumbers: leastOverlapFailedCmds, }, true } } - return &clientpb.CommandStatusResponse{ - Command: command, - Status: clientpb.CommandStatusResponse_PENDING, - }, false + return nil, false } type pendingCmd struct { @@ -85,19 +118,19 @@ type Client struct { logger logging.Logger id ID - mut sync.Mutex - mgr *clientpb.Manager - gorumsConfig *clientpb.Configuration - payloadSize uint32 - highestCommitted uint64 // highest sequence number acknowledged by the replicas - pendingCmds chan pendingCmd - cancel context.CancelFunc - done chan struct{} - reader io.ReadCloser - limiter *rate.Limiter - stepUp float64 - stepUpInterval time.Duration - timeout time.Duration + mut sync.Mutex + mgr *clientpb.Manager + gorumsConfig *clientpb.Configuration + payloadSize uint32 + highestCommitted uint64 // highest sequence number acknowledged by the replicas + cancel context.CancelFunc + done chan struct{} + reader io.ReadCloser + limiter *rate.Limiter + stepUp float64 + stepUpInterval time.Duration + timeout time.Duration + failedCommandIdSet *BitSet } // New returns a new Client. @@ -112,17 +145,16 @@ func New( logger: logger, id: id, - pendingCmds: make(chan pendingCmd, conf.MaxConcurrent), - highestCommitted: 1, - done: make(chan struct{}), - reader: conf.Input, - payloadSize: conf.PayloadSize, - limiter: rate.NewLimiter(rate.Limit(conf.RateLimit), 1), - stepUp: conf.RateStep, - stepUpInterval: conf.RateStepInterval, - timeout: conf.Timeout, + highestCommitted: 1, + done: make(chan struct{}), + reader: conf.Input, + payloadSize: conf.PayloadSize, + limiter: rate.NewLimiter(rate.Limit(conf.RateLimit), 1), + stepUp: conf.RateStep, + stepUpInterval: conf.RateStepInterval, + timeout: conf.Timeout, + failedCommandIdSet: NewBitSet(5000000), // assuming max 5 million commands for now } - var creds credentials.TransportCredentials if conf.TLS { creds = credentials.NewClientTLSFromCert(conf.RootCAs, "") @@ -204,7 +236,6 @@ func (c *Client) Stop() { func (c *Client) close() { // Signal the command handler to stop fetching statuses before closing the manager. - close(c.pendingCmds) c.mgr.Close() err := c.reader.Close() if err != nil { @@ -220,7 +251,6 @@ func (c *Client) sendCommands(ctx context.Context) error { nextLogTime = time.Now().Add(time.Second) ) -loop: for ctx.Err() == nil { // step up the rate limiter @@ -259,104 +289,45 @@ loop: SequenceNumber: num, Data: data[:n], } - - ctx, cancel := context.WithTimeout(ctx, c.timeout) - c.gorumsConfig.ExecCommand(ctx, cmd) - pending := pendingCmd{sequenceNumber: num, sendTime: time.Now(), cancelCtx: cancel} - + c.gorumsConfig.ExecCommand(context.Background(), cmd) num++ - select { - case c.pendingCmds <- pending: - case <-ctx.Done(): - break loop - } - if time.Now().After(nextLogTime) { c.logger.Infof("%d commands sent so far", num) nextLogTime = time.Now().Add(time.Second) } - } return nil } -func (c *Client) fetchCommandStatus(sequenceNumber uint64) hotstuff.CommandStatus { - ticker := time.NewTicker(100 * time.Millisecond) - defer ticker.Stop() - timeout := time.After(c.timeout) - - for { - select { - case <-ticker.C: - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - cmd := &clientpb.Command{ - ClientID: uint32(c.id), - SequenceNumber: sequenceNumber, - } - - response, err := c.gorumsConfig.CommandStatus(ctx, cmd) - cancel() - - if err != nil { - c.logger.Errorf("Failed to fetch command status (client: %d, sequence: %d): %v", c.id, sequenceNumber, err) - // If the node/manager was closed, stop trying and return UNKNOWN. - if strings.Contains(err.Error(), "node closed") { - return hotstuff.UNKNOWN - } - continue - } - if response == nil || response.Command == nil { - c.logger.Errorf("Invalid response received when fetching command status (client: %d, sequence: %d)", c.id, sequenceNumber) - continue - } - c.logger.Infof("Fetched command status (client: %d, sequence: %d, status: %d)", c.id, sequenceNumber, response.Status) - status := hotstuff.CommandStatus(response.Status) - if status == hotstuff.COMMITTED || status == hotstuff.EXECUTED || status == hotstuff.FAILED { - return status - } - case <-timeout: - return hotstuff.UNKNOWN - } - } -} - // handleCommands will get pending commands from the pendingCmds channel and then // handle them as they become acknowledged by the replicas. We expect the commands to be // acknowledged in the order that they were sent. func (c *Client) handleCommands(ctx context.Context) (executed, failed, timeout int) { for { - var ( - cmd pendingCmd - ok bool - ) + statusRefresher := time.NewTicker(100 * time.Millisecond) select { - case cmd, ok = <-c.pendingCmds: - if !ok { - return + case <-statusRefresher.C: + commandStatus, err := c.gorumsConfig.CommandStatus(ctx, &clientpb.Command{ + ClientID: uint32(c.id), + }) + if err != nil { + c.logger.Error("Failed to get command status: ", err) + continue + } + c.mut.Lock() + if c.highestCommitted < commandStatus.HighestSequenceNumber { + c.highestCommitted = commandStatus.HighestSequenceNumber + } + for _, failedSeqNum := range commandStatus.FailedSequenceNumbers { + c.failedCommandIdSet.Add(failedSeqNum) } + failed = c.failedCommandIdSet.Count() + executed = int(c.highestCommitted) - failed + timeout = 0 + c.mut.Unlock() case <-ctx.Done(): return } - response := c.fetchCommandStatus(cmd.sequenceNumber) - - switch response { - case hotstuff.UNKNOWN: - c.logger.Infof("Command timed out (client: %d, sequence: %d)", c.id, cmd.sequenceNumber) - timeout++ - case hotstuff.FAILED: - c.logger.Infof("Command failed (client: %d, sequence: %d)", c.id, cmd.sequenceNumber) - failed++ - default: - c.logger.Infof("Command executed (client: %d, sequence: %d)", c.id, cmd.sequenceNumber) - executed++ - } - c.mut.Lock() - if cmd.sequenceNumber > c.highestCommitted { - c.highestCommitted = cmd.sequenceNumber - } - c.mut.Unlock() - duration := time.Since(cmd.sendTime) - c.eventLoop.AddEvent(LatencyMeasurementEvent{Latency: duration}) } } @@ -364,3 +335,59 @@ func (c *Client) handleCommands(ctx context.Context) (executed, failed, timeout type LatencyMeasurementEvent struct { Latency time.Duration } + +// BitSet is a space-efficient set for uint64 values +type BitSet struct { + mut sync.Mutex + bits []uint64 +} + +func NewBitSet(maxVal uint64) *BitSet { + size := (maxVal / 64) + 1 + return &BitSet{ + bits: make([]uint64, size), + } +} + +func (bs *BitSet) Add(val uint64) { + bs.mut.Lock() + defer bs.mut.Unlock() + + index := val / 64 + offset := val % 64 + if index < uint64(len(bs.bits)) { + bs.bits[index] |= (1 << offset) + } +} + +func (bs *BitSet) Contains(val uint64) bool { + bs.mut.Lock() + defer bs.mut.Unlock() + + index := val / 64 + offset := val % 64 + if index < uint64(len(bs.bits)) { + return (bs.bits[index] & (1 << offset)) != 0 + } + return false +} + +func (bs *BitSet) Count() int { + bs.mut.Lock() + defer bs.mut.Unlock() + + count := 0 + for _, word := range bs.bits { + count += popcount(word) + } + return count +} + +func popcount(x uint64) int { + count := 0 + for x != 0 { + x &= x - 1 + count++ + } + return count +} diff --git a/internal/proto/clientpb/client.pb.go b/internal/proto/clientpb/client.pb.go index 6c650f076..14da714e5 100644 --- a/internal/proto/clientpb/client.pb.go +++ b/internal/proto/clientpb/client.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.11 -// protoc v6.33.4 +// protoc v6.30.2 // source: internal/proto/clientpb/client.proto package clientpb @@ -23,67 +23,12 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) -type CommandStatusResponse_Status int32 - -const ( - CommandStatusResponse_UNKNOWN CommandStatusResponse_Status = 0 - CommandStatusResponse_PENDING CommandStatusResponse_Status = 1 - CommandStatusResponse_COMMITTED CommandStatusResponse_Status = 2 - CommandStatusResponse_EXECUTED CommandStatusResponse_Status = 3 - CommandStatusResponse_FAILED CommandStatusResponse_Status = 4 -) - -// Enum value maps for CommandStatusResponse_Status. -var ( - CommandStatusResponse_Status_name = map[int32]string{ - 0: "UNKNOWN", - 1: "PENDING", - 2: "COMMITTED", - 3: "EXECUTED", - 4: "FAILED", - } - CommandStatusResponse_Status_value = map[string]int32{ - "UNKNOWN": 0, - "PENDING": 1, - "COMMITTED": 2, - "EXECUTED": 3, - "FAILED": 4, - } -) - -func (x CommandStatusResponse_Status) Enum() *CommandStatusResponse_Status { - p := new(CommandStatusResponse_Status) - *p = x - return p -} - -func (x CommandStatusResponse_Status) String() string { - return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) -} - -func (CommandStatusResponse_Status) Descriptor() protoreflect.EnumDescriptor { - return file_internal_proto_clientpb_client_proto_enumTypes[0].Descriptor() -} - -func (CommandStatusResponse_Status) Type() protoreflect.EnumType { - return &file_internal_proto_clientpb_client_proto_enumTypes[0] -} - -func (x CommandStatusResponse_Status) Number() protoreflect.EnumNumber { - return protoreflect.EnumNumber(x) -} - -// Deprecated: Use CommandStatusResponse_Status.Descriptor instead. -func (CommandStatusResponse_Status) EnumDescriptor() ([]byte, []int) { - return file_internal_proto_clientpb_client_proto_rawDescGZIP(), []int{0, 0} -} - type CommandStatusResponse struct { - state protoimpl.MessageState `protogen:"open.v1"` - Status CommandStatusResponse_Status `protobuf:"varint,1,opt,name=status,proto3,enum=clientpb.CommandStatusResponse_Status" json:"status,omitempty"` - Command *Command `protobuf:"bytes,2,opt,name=command,proto3" json:"command,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + HighestSequenceNumber uint64 `protobuf:"varint,1,opt,name=highestSequenceNumber,proto3" json:"highestSequenceNumber,omitempty"` + FailedSequenceNumbers []uint64 `protobuf:"varint,2,rep,packed,name=failedSequenceNumbers,proto3" json:"failedSequenceNumbers,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *CommandStatusResponse) Reset() { @@ -116,16 +61,16 @@ func (*CommandStatusResponse) Descriptor() ([]byte, []int) { return file_internal_proto_clientpb_client_proto_rawDescGZIP(), []int{0} } -func (x *CommandStatusResponse) GetStatus() CommandStatusResponse_Status { +func (x *CommandStatusResponse) GetHighestSequenceNumber() uint64 { if x != nil { - return x.Status + return x.HighestSequenceNumber } - return CommandStatusResponse_UNKNOWN + return 0 } -func (x *CommandStatusResponse) GetCommand() *Command { +func (x *CommandStatusResponse) GetFailedSequenceNumbers() []uint64 { if x != nil { - return x.Command + return x.FailedSequenceNumbers } return nil } @@ -241,17 +186,10 @@ var File_internal_proto_clientpb_client_proto protoreflect.FileDescriptor const file_internal_proto_clientpb_client_proto_rawDesc = "" + "\n" + - "$internal/proto/clientpb/client.proto\x12\bclientpb\x1a\fgorums.proto\x1a\x1bgoogle/protobuf/empty.proto\"\xd1\x01\n" + - "\x15CommandStatusResponse\x12>\n" + - "\x06status\x18\x01 \x01(\x0e2&.clientpb.CommandStatusResponse.StatusR\x06status\x12+\n" + - "\acommand\x18\x02 \x01(\v2\x11.clientpb.CommandR\acommand\"K\n" + - "\x06Status\x12\v\n" + - "\aUNKNOWN\x10\x00\x12\v\n" + - "\aPENDING\x10\x01\x12\r\n" + - "\tCOMMITTED\x10\x02\x12\f\n" + - "\bEXECUTED\x10\x03\x12\n" + - "\n" + - "\x06FAILED\x10\x04\"a\n" + + "$internal/proto/clientpb/client.proto\x12\bclientpb\x1a\fgorums.proto\x1a\x1bgoogle/protobuf/empty.proto\"\x83\x01\n" + + "\x15CommandStatusResponse\x124\n" + + "\x15highestSequenceNumber\x18\x01 \x01(\x04R\x15highestSequenceNumber\x124\n" + + "\x15failedSequenceNumbers\x18\x02 \x03(\x04R\x15failedSequenceNumbers\"a\n" + "\aCommand\x12\x1a\n" + "\bClientID\x18\x01 \x01(\rR\bClientID\x12&\n" + "\x0eSequenceNumber\x18\x02 \x01(\x04R\x0eSequenceNumber\x12\x12\n" + @@ -274,28 +212,24 @@ func file_internal_proto_clientpb_client_proto_rawDescGZIP() []byte { return file_internal_proto_clientpb_client_proto_rawDescData } -var file_internal_proto_clientpb_client_proto_enumTypes = make([]protoimpl.EnumInfo, 1) var file_internal_proto_clientpb_client_proto_msgTypes = make([]protoimpl.MessageInfo, 3) var file_internal_proto_clientpb_client_proto_goTypes = []any{ - (CommandStatusResponse_Status)(0), // 0: clientpb.CommandStatusResponse.Status - (*CommandStatusResponse)(nil), // 1: clientpb.CommandStatusResponse - (*Command)(nil), // 2: clientpb.Command - (*Batch)(nil), // 3: clientpb.Batch - (*emptypb.Empty)(nil), // 4: google.protobuf.Empty + (*CommandStatusResponse)(nil), // 0: clientpb.CommandStatusResponse + (*Command)(nil), // 1: clientpb.Command + (*Batch)(nil), // 2: clientpb.Batch + (*emptypb.Empty)(nil), // 3: google.protobuf.Empty } var file_internal_proto_clientpb_client_proto_depIdxs = []int32{ - 0, // 0: clientpb.CommandStatusResponse.status:type_name -> clientpb.CommandStatusResponse.Status - 2, // 1: clientpb.CommandStatusResponse.command:type_name -> clientpb.Command - 2, // 2: clientpb.Batch.Commands:type_name -> clientpb.Command - 2, // 3: clientpb.Client.ExecCommand:input_type -> clientpb.Command - 2, // 4: clientpb.Client.CommandStatus:input_type -> clientpb.Command - 4, // 5: clientpb.Client.ExecCommand:output_type -> google.protobuf.Empty - 1, // 6: clientpb.Client.CommandStatus:output_type -> clientpb.CommandStatusResponse - 5, // [5:7] is the sub-list for method output_type - 3, // [3:5] is the sub-list for method input_type - 3, // [3:3] is the sub-list for extension type_name - 3, // [3:3] is the sub-list for extension extendee - 0, // [0:3] is the sub-list for field type_name + 1, // 0: clientpb.Batch.Commands:type_name -> clientpb.Command + 1, // 1: clientpb.Client.ExecCommand:input_type -> clientpb.Command + 1, // 2: clientpb.Client.CommandStatus:input_type -> clientpb.Command + 3, // 3: clientpb.Client.ExecCommand:output_type -> google.protobuf.Empty + 0, // 4: clientpb.Client.CommandStatus:output_type -> clientpb.CommandStatusResponse + 3, // [3:5] is the sub-list for method output_type + 1, // [1:3] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name } func init() { file_internal_proto_clientpb_client_proto_init() } @@ -308,14 +242,13 @@ func file_internal_proto_clientpb_client_proto_init() { File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_internal_proto_clientpb_client_proto_rawDesc), len(file_internal_proto_clientpb_client_proto_rawDesc)), - NumEnums: 1, + NumEnums: 0, NumMessages: 3, NumExtensions: 0, NumServices: 1, }, GoTypes: file_internal_proto_clientpb_client_proto_goTypes, DependencyIndexes: file_internal_proto_clientpb_client_proto_depIdxs, - EnumInfos: file_internal_proto_clientpb_client_proto_enumTypes, MessageInfos: file_internal_proto_clientpb_client_proto_msgTypes, }.Build() File_internal_proto_clientpb_client_proto = out.File diff --git a/internal/proto/clientpb/client.proto b/internal/proto/clientpb/client.proto index 6b3258e68..b64958a33 100644 --- a/internal/proto/clientpb/client.proto +++ b/internal/proto/clientpb/client.proto @@ -21,15 +21,8 @@ service Client { } message CommandStatusResponse { - enum Status { - UNKNOWN = 0; - PENDING = 1; - COMMITTED = 2; - EXECUTED = 3; - FAILED = 4; - } - Status status = 1; - Command command = 2; + uint64 highestSequenceNumber = 1; + repeated uint64 failedSequenceNumbers = 2; } diff --git a/internal/proto/clientpb/client_gorums.pb.go b/internal/proto/clientpb/client_gorums.pb.go index 532a43fa7..cf8fa1504 100644 --- a/internal/proto/clientpb/client_gorums.pb.go +++ b/internal/proto/clientpb/client_gorums.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-gorums. DO NOT EDIT. // versions: // protoc-gen-gorums v0.10.0-devel -// protoc v6.33.4 +// protoc v6.30.2 // source: internal/proto/clientpb/client.proto package clientpb diff --git a/server/clientio.go b/server/clientio.go index 113872297..54f786f05 100644 --- a/server/clientio.go +++ b/server/clientio.go @@ -7,122 +7,82 @@ import ( "sync" "github.com/relab/gorums" - "github.com/relab/hotstuff" "github.com/relab/hotstuff/core/eventloop" "github.com/relab/hotstuff/core/logging" "github.com/relab/hotstuff/internal/proto/clientpb" ) -// clientStatusWindow tracks command statuses for a single client using an array-based approach. -// This is more memory-efficient than nested maps, especially for increasing sequence numbers. +const ( + // hotstuffFailedCommandLength is the maximum number of failed sequence numbers to track per client + hotstuffFailedCommandLength = 100 +) + +// clientStatusWindow tracks command statuses for a single client. +// It stores the highest successfully executed sequence number and a list of failed sequence numbers. type clientStatusWindow struct { - baseSeqNum uint64 // starting sequence number for the current window - statuses []hotstuff.CommandStatus // array indexed by (seqNum - baseSeqNum) + // HighestSuccess is the highest successfully executed sequence number for this client + HighestSuccess uint64 + // FailedCmds is the list of recent failed sequence numbers (up to maxFailed entries) + FailedCmds []uint64 } -// CommandStatusTracker efficiently tracks command status per client per sequence number. -// Uses array-based storage per client to avoid the overhead of nested maps. -// Supports sliding window cleanup to prevent unbounded memory growth. +// CommandStatusTracker stores per-client command status information. +// It tracks the highest successful sequence number and recent failed sequence numbers for each client. type CommandStatusTracker struct { - // clientWindows maps ClientID -> client status window + // clientWindows maps ClientID to its status window clientWindows map[uint32]*clientStatusWindow + // maxFailed is the maximum number of failed sequence numbers to track per client + maxFailed int } -// NewCommandStatusTracker creates a new status tracker -func NewCommandStatusTracker() *CommandStatusTracker { - return &CommandStatusTracker{ - clientWindows: make(map[uint32]*clientStatusWindow), - } -} - -// SetStatus sets the status of a command -func (cst *CommandStatusTracker) SetStatus(clientID uint32, seqNum uint64, status hotstuff.CommandStatus) { - window, exists := cst.clientWindows[clientID] - if !exists { - // Initialize new window for this client, starting at this sequence number - window = &clientStatusWindow{ - baseSeqNum: seqNum, - statuses: make([]hotstuff.CommandStatus, 5000), +// ensureWindow returns the status window for clientID, creating it if it doesn't exist. +func (cst *CommandStatusTracker) ensureWindow(clientID uint32) *clientStatusWindow { + w, ok := cst.clientWindows[clientID] + if !ok { + w = &clientStatusWindow{ + FailedCmds: make([]uint64, 0, cst.maxFailed), } - cst.clientWindows[clientID] = window - window.statuses[0] = status - return - } - - // Check if seqNum is within current window - if seqNum >= window.baseSeqNum { - index := seqNum - window.baseSeqNum - // Extend array if necessary - if index >= uint64(len(window.statuses)) { - // Grow array by 50% or enough to fit the new index, whichever is larger - newLen := len(window.statuses) + len(window.statuses)/2 + 1 - if int(index) >= newLen { - newLen = int(index) + 1 - } - newStatuses := make([]hotstuff.CommandStatus, newLen) - copy(newStatuses, window.statuses) - window.statuses = newStatuses - } - window.statuses[index] = status + cst.clientWindows[clientID] = w } - // Ignore updates for seqNum < baseSeqNum (already cleaned up) + return w } -// GetStatus retrieves the status of a command. Returns StatusExecuted if not found. -func (cst *CommandStatusTracker) GetStatus(clientID uint32, seqNum uint64) hotstuff.CommandStatus { - window, exists := cst.clientWindows[clientID] - if !exists { - return hotstuff.EXECUTED +// addFailed records a failed sequence number in the window. +// If the window is full (maxFailed entries), it removes the oldest entry before adding the new one. +func (cst *CommandStatusTracker) addFailed(client uint32, seq uint64) { + w := cst.ensureWindow(client) + // If we have reached maxFailed, remove the oldest entry + if len(w.FailedCmds) >= cst.maxFailed { + w.FailedCmds = w.FailedCmds[1:] } + w.FailedCmds = append(w.FailedCmds, seq) +} - // Check if seqNum is within current window - if seqNum >= window.baseSeqNum && seqNum < window.baseSeqNum+uint64(len(window.statuses)) { - index := seqNum - window.baseSeqNum - return window.statuses[index] +// NewCommandStatusTracker creates a new CommandStatusTracker with default settings. +func NewCommandStatusTracker() *CommandStatusTracker { + return &CommandStatusTracker{ + clientWindows: make(map[uint32]*clientStatusWindow), + maxFailed: hotstuffFailedCommandLength, } - - // If outside window (cleaned up or not yet added), assume executed - return hotstuff.EXECUTED } -// Cleanup removes entries for sequence numbers less than or equal to the given threshold per client. -// This prevents unbounded memory growth by sliding the window forward. -func (cst *CommandStatusTracker) Cleanup(clientID uint32, upToSeqNum uint64) { - window, exists := cst.clientWindows[clientID] - if !exists { +// setSuccess updates the highest successfully executed sequence number for a client. +// It only updates if the new sequence number is higher than the current highest. +func (cst *CommandStatusTracker) setSuccess(clientID uint32, seqNum uint64) { + w := cst.ensureWindow(clientID) + if seqNum < w.HighestSuccess { return } - - // Calculate how many entries to remove from the front - if upToSeqNum >= window.baseSeqNum { - entriesToRemove := int(upToSeqNum - window.baseSeqNum + 1) - if entriesToRemove >= len(window.statuses) { - // Remove entire window, clean up the client entry - delete(cst.clientWindows, clientID) - return - } - - // Slide the window forward - window.statuses = window.statuses[entriesToRemove:] - window.baseSeqNum = upToSeqNum + 1 - } + w.HighestSuccess = seqNum } -// GetClientStatuses returns a snapshot of all statuses for a given client (for testing/debugging) -func (cst *CommandStatusTracker) GetClientStatuses(clientID uint32) map[uint64]hotstuff.CommandStatus { - window, exists := cst.clientWindows[clientID] - if !exists { - return make(map[uint64]hotstuff.CommandStatus) - } - - snapshot := make(map[uint64]hotstuff.CommandStatus, len(window.statuses)) - for i, status := range window.statuses { - snapshot[window.baseSeqNum+uint64(i)] = status - } - return snapshot +// GetClientStatuses returns the status window for a given client. +// If the client doesn't exist, it creates and returns an empty window. +func (cst *CommandStatusTracker) GetClientStatuses(clientID uint32) *clientStatusWindow { + return cst.ensureWindow(clientID) } -// ClientIO serves a client. +// ClientIO serves client requests and manages command execution tracking. type ClientIO struct { logger logging.Logger cmdCache *clientpb.CommandCache @@ -132,12 +92,12 @@ type ClientIO struct { hash hash.Hash cmdCount uint32 - lastExecutedSeqNum map[uint32]uint64 // highest executed sequence number per client ID - statusTracker *CommandStatusTracker // tracks status of all commands (executed/aborted/failed) - + lastExecutedSeqNum map[uint32]uint64 // tracks the highest executed sequence number per client ID + statusTracker *CommandStatusTracker // tracks command execution status (success/failure) per client } -// NewClientIO returns a new client IO server. +// NewClientIO creates and returns a new ClientIO server instance. +// It registers the server with the event loop to handle Execute and Abort events. func NewClientIO( el *eventloop.EventLoop, logger logging.Logger, @@ -163,6 +123,7 @@ func NewClientIO( return srv } +// StartOnListener starts the gRPC server on the provided listener. func (srv *ClientIO) StartOnListener(lis net.Listener) { go func() { err := srv.srv.Serve(lis) @@ -172,24 +133,29 @@ func (srv *ClientIO) StartOnListener(lis net.Listener) { }() } +// Stop stops the gRPC server. func (srv *ClientIO) Stop() { srv.srv.Stop() } +// Hash returns the current hash of all executed commands. func (srv *ClientIO) Hash() hash.Hash { return srv.hash } +// CmdCount returns the total number of executed commands. func (srv *ClientIO) CmdCount() uint32 { return srv.cmdCount } +// ExecCommand receives a command from a client and adds it to the command cache. func (srv *ClientIO) ExecCommand(ctx gorums.ServerCtx, cmd *clientpb.Command) { srv.cmdCache.Add(cmd) - srv.statusTracker.SetStatus(cmd.ClientID, cmd.SequenceNumber, hotstuff.UNKNOWN) ctx.Release() } +// Exec executes a batch of commands, updating the hash and command count. +// It skips duplicate commands and marks successful executions in the status tracker. func (srv *ClientIO) Exec(batch *clientpb.Batch) { for _, cmd := range batch.GetCommands() { @@ -201,7 +167,7 @@ func (srv *ClientIO) Exec(batch *clientpb.Batch) { } srv.lastExecutedSeqNum[cmd.ClientID] = cmd.SequenceNumber // Mark command as executed in status tracker - srv.statusTracker.SetStatus(cmd.ClientID, cmd.SequenceNumber, hotstuff.EXECUTED) + srv.statusTracker.setSuccess(cmd.ClientID, cmd.SequenceNumber) _, _ = srv.hash.Write(cmd.Data) srv.cmdCount++ srv.mut.Unlock() @@ -209,37 +175,32 @@ func (srv *ClientIO) Exec(batch *clientpb.Batch) { srv.logger.Debugf("Hash: %.8x", srv.hash.Sum(nil)) } +// Abort marks a batch of commands as failed in the status tracker. func (srv *ClientIO) Abort(batch *clientpb.Batch) { for _, cmd := range batch.GetCommands() { srv.mut.Lock() // Mark command as aborted in status tracker - srv.statusTracker.SetStatus(cmd.ClientID, cmd.SequenceNumber, hotstuff.FAILED) + srv.statusTracker.addFailed(cmd.ClientID, cmd.SequenceNumber) srv.mut.Unlock() } } -// isDuplicate return true if the command has already been executed. +// isDuplicate returns true if the command has already been executed. // The caller must hold srv.mut.Lock(). func (srv *ClientIO) isDuplicate(cmd *clientpb.Command) bool { seqNum, ok := srv.lastExecutedSeqNum[cmd.ClientID] return ok && seqNum >= cmd.SequenceNumber } -// CleanupOldStatuses removes command status entries that are older than the given sequence number -// for a specific client. This should be called periodically to prevent unbounded memory growth. -func (srv *ClientIO) CleanupOldStatuses(clientID uint32, upToSeqNum uint64) { - srv.mut.Lock() - defer srv.mut.Unlock() - srv.statusTracker.Cleanup(clientID, upToSeqNum) -} - +// CommandStatus returns the execution status for a given command. +// It returns the highest executed sequence number and list of failed sequence numbers for the client. func (srv *ClientIO) CommandStatus(_ gorums.ServerCtx, in *clientpb.Command) (resp *clientpb.CommandStatusResponse, err error) { srv.mut.Lock() defer srv.mut.Unlock() - status := srv.statusTracker.GetStatus(in.ClientID, in.SequenceNumber) - srv.logger.Infof("Received CommandStatus request (client: %d, sequence: %d, status: %d)", in.ClientID, in.SequenceNumber, status) + CommandStatus := srv.statusTracker.GetClientStatuses(in.ClientID) + return &clientpb.CommandStatusResponse{ - Status: clientpb.CommandStatusResponse_Status(status), - Command: in, + HighestSequenceNumber: CommandStatus.HighestSuccess, + FailedSequenceNumbers: CommandStatus.FailedCmds, }, nil } diff --git a/server/clientio_test.go b/server/clientio_test.go index 1b9254409..723a76bd6 100644 --- a/server/clientio_test.go +++ b/server/clientio_test.go @@ -2,124 +2,87 @@ package server import ( "testing" - - "github.com/relab/hotstuff" ) -func TestCommandStatusTracker_GetStatus_DefaultExecuted(t *testing.T) { - tr := NewCommandStatusTracker() - status := tr.GetStatus(1, 1) - if status != hotstuff.EXECUTED { - t.Fatalf("expected default status EXECUTED for unknown client, got %v", status) - } -} - -func TestCommandStatusTracker_SetAndGet(t *testing.T) { +func TestCommandStatusTracker_SetSuccess(t *testing.T) { tr := NewCommandStatusTracker() clientID := uint32(1) - seq := uint64(10) - tr.SetStatus(clientID, seq, hotstuff.UNKNOWN) - if got := tr.GetStatus(clientID, seq); got != hotstuff.UNKNOWN { - t.Fatalf("GetStatus = %v; want %v", got, hotstuff.UNKNOWN) - } -} - -func TestCommandStatusTracker_ExtendWindow(t *testing.T) { - tr := NewCommandStatusTracker() - clientID := uint32(2) + tr.setSuccess(clientID, 10) + w := tr.GetClientStatuses(clientID) - // initial set at seq 1 - tr.SetStatus(clientID, 1, hotstuff.UNKNOWN) - if got := tr.GetStatus(clientID, 1); got != hotstuff.UNKNOWN { - t.Fatalf("initial GetStatus = %v; want %v", got, hotstuff.UNKNOWN) + if w.HighestSuccess != 10 { + t.Fatalf("HighestSuccess = %d; want 10", w.HighestSuccess) } - // set far ahead to force window growth - largeSeq := uint64(6000) - tr.SetStatus(clientID, largeSeq, hotstuff.FAILED) - if got := tr.GetStatus(clientID, largeSeq); got != hotstuff.FAILED { - t.Fatalf("after extend GetStatus(%d) = %v; want %v", largeSeq, got, hotstuff.FAILED) + // Setting lower sequence number should not update + tr.setSuccess(clientID, 5) + w = tr.GetClientStatuses(clientID) + if w.HighestSuccess != 10 { + t.Fatalf("HighestSuccess = %d; want 10", w.HighestSuccess) } - // earlier entry should still be present - if got := tr.GetStatus(clientID, 1); got != hotstuff.UNKNOWN { - t.Fatalf("after extend GetStatus(1) = %v; want %v", got, hotstuff.UNKNOWN) + // Setting higher should update + tr.setSuccess(clientID, 20) + w = tr.GetClientStatuses(clientID) + if w.HighestSuccess != 20 { + t.Fatalf("HighestSuccess = %d; want 20", w.HighestSuccess) } } -func TestCommandStatusTracker_CleanupSlidesAndDeletes(t *testing.T) { +func TestCommandStatusTracker_AddFailed(t *testing.T) { tr := NewCommandStatusTracker() - clientID := uint32(3) + tr.maxFailed = 3 // Set small limit for testing + clientID := uint32(2) - // set statuses for seqs 1..10 - for i := uint64(1); i <= 10; i++ { - tr.SetStatus(clientID, i, hotstuff.UNKNOWN) - } + tr.addFailed(clientID, 10) + tr.addFailed(clientID, 15) + tr.addFailed(clientID, 20) - // cleanup up to 4 -> base should become 5, entries 1..4 removed - tr.Cleanup(clientID, 4) - // seq 3 should now be treated as executed (cleaned up) - if got := tr.GetStatus(clientID, 3); got != hotstuff.EXECUTED { - t.Fatalf("after cleanup GetStatus(3) = %v; want EXECUTED", got) - } - // seq 5 should still be UNKNOWN - if got := tr.GetStatus(clientID, 5); got != hotstuff.UNKNOWN { - t.Fatalf("after cleanup GetStatus(5) = %v; want %v", got, hotstuff.UNKNOWN) + w := tr.GetClientStatuses(clientID) + if len(w.FailedCmds) != 3 { + t.Fatalf("FailedCmds length = %d; want 3", len(w.FailedCmds)) } - // cleanup up to 10 -> should remove entire window - tr.Cleanup(clientID, 10) - if got := tr.GetStatus(clientID, 5); got != hotstuff.EXECUTED { - t.Fatalf("after full cleanup GetStatus(5) = %v; want EXECUTED", got) + // Adding one more should drop the oldest + tr.addFailed(clientID, 25) + w = tr.GetClientStatuses(clientID) + if len(w.FailedCmds) != 3 { + t.Fatalf("FailedCmds length = %d; want 3", len(w.FailedCmds)) } - // snapshot should be empty for this client - // snapshot should not contain cleaned-up seqs 1..10 - if m := tr.GetClientStatuses(clientID); len(m) != 0 { - for i := uint64(1); i <= 10; i++ { - if _, ok := m[i]; ok { - t.Fatalf("snapshot contains cleaned-up seq %d", i) - } + + // Should contain 15, 20, 25 (10 should be dropped) + expected := []uint64{15, 20, 25} + for i, seq := range w.FailedCmds { + if seq != expected[i] { + t.Fatalf("FailedCmds[%d] = %d; want %d", i, seq, expected[i]) } - // It's an implementation detail whether the snapshot is empty or contains - // preallocated entries for sequences > cleanup point (which is why len(m) - // may be 4990). We only assert here that cleaned-up sequences are gone. } } -func TestCommandStatusTracker_IgnoreOldSetStatus(t *testing.T) { +func TestCommandStatusTracker_GetClientStatuses(t *testing.T) { tr := NewCommandStatusTracker() - clientID := uint32(4) + clientID := uint32(3) - // create window at seq 100 - tr.SetStatus(clientID, 100, hotstuff.UNKNOWN) - // attempt to set older seq 90 -> should be ignored - tr.SetStatus(clientID, 90, hotstuff.FAILED) - if got := tr.GetStatus(clientID, 90); got != hotstuff.EXECUTED { - t.Fatalf("old SetStatus should be ignored; GetStatus(90) = %v; want EXECUTED", got) + // New client should have empty window + w := tr.GetClientStatuses(clientID) + if w.HighestSuccess != 0 { + t.Fatalf("HighestSuccess = %d; want 0", w.HighestSuccess) } - // newer seq should still be present - if got := tr.GetStatus(clientID, 100); got != hotstuff.UNKNOWN { - t.Fatalf("GetStatus(100) = %v; want %v", got, hotstuff.UNKNOWN) + if len(w.FailedCmds) != 0 { + t.Fatalf("FailedCmds length = %d; want 0", len(w.FailedCmds)) } -} -func TestCommandStatusTracker_GetClientStatusesSnapshot(t *testing.T) { - tr := NewCommandStatusTracker() - clientID := uint32(5) + // Add some data + tr.setSuccess(clientID, 50) + tr.addFailed(clientID, 52) + tr.addFailed(clientID, 55) - tr.SetStatus(clientID, 50, hotstuff.UNKNOWN) - tr.SetStatus(clientID, 52, hotstuff.FAILED) - - snap := tr.GetClientStatuses(clientID) - if snap[50] != hotstuff.UNKNOWN { - t.Fatalf("snapshot[50] = %v; want %v", snap[50], hotstuff.UNKNOWN) - } - if snap[52] != hotstuff.FAILED { - t.Fatalf("snapshot[52] = %v; want %v", snap[52], hotstuff.FAILED) + w = tr.GetClientStatuses(clientID) + if w.HighestSuccess != 50 { + t.Fatalf("HighestSuccess = %d; want 50", w.HighestSuccess) } - // a sequence not set but within window will have zero value; ensure an unrelated seq returns EXECUTED via GetStatus - if got := tr.GetStatus(clientID, 49); got != hotstuff.EXECUTED { - t.Fatalf("GetStatus(49) = %v; want EXECUTED", got) + if len(w.FailedCmds) != 2 { + t.Fatalf("FailedCmds length = %d; want 2", len(w.FailedCmds)) } } diff --git a/types.go b/types.go index bae2baa35..e3d78fab8 100644 --- a/types.go +++ b/types.go @@ -373,13 +373,3 @@ type ReplicaInfo struct { Location string Metadata map[string]string } - -type CommandStatus uint8 - -const ( - UNKNOWN CommandStatus = iota - PENDING - COMMITTED - EXECUTED - FAILED -)