Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 158 additions & 68 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"errors"
"io"
"math"
"slices"
"sync"
"time"

Expand All @@ -21,7 +22,6 @@
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/emptypb"
)

// ID is the identifier for a client.
Expand All @@ -31,17 +31,70 @@
faulty int
}

func (q *qspec) ExecCommandQF(_ *clientpb.Command, signatures map[uint32]*emptypb.Empty) (*emptypb.Empty, bool) {
if len(signatures) < q.faulty+1 {
// leastOverlapSet returns the set of uint64 values that appear in all slices.
func leastOverlapSet(slices [][]uint64) []uint64 {
if len(slices) == 0 {
return []uint64{}
}

// Count occurrences of each value across all slices
occurrences := make(map[uint64]int)
for _, slice := range slices {
seen := make(map[uint64]bool)
for _, val := range slice {
if !seen[val] {
occurrences[val]++
seen[val] = true
}
}
}

// Find values that appear in all slices
result := []uint64{}
numSlices := len(slices)
for val, count := range occurrences {
if count == numSlices {
result = append(result, val)
}
}

return result
}

func (q *qspec) CommandStatusQF(command *clientpb.Command, replies map[uint32]*clientpb.CommandStatusResponse) (*clientpb.CommandStatusResponse, bool) {

Check failure on line 64 in client/client.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 'command' seems to be unused, consider removing or renaming it as _ (revive)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See also my comment below; I think this should be redesigned to keep track of how many commands each replica has executed and combine the results into a single reply that tells the client how many of its commands have actually been executed. For example, something along these lines (I'm not checking for nil or other failure modes here):

lowMin, highMin := uint64(0), uint64(math.MaxUint64)
for _, resp := range replies {
	if resp.LowSequenceNumber != command.LowSequenceNumber {
		// invalid reply??
	}
	lowMin = min(lowMin, resp.LowSequenceNumber)
	highMin = min(highMin, resp.HighSequenceNumber)
}

The logic above isn't quite right, since we need f+1 "proofs" of the highMin value.

if len(replies) < q.faulty+1 {
return nil, false
}
return &emptypb.Empty{}, true
successfulHighestCmds := make([]uint64, 0, len(replies))
commandCount := make(map[uint64]int)
failedCommandIdSets := make([][]uint64, 0, len(replies))

Check failure on line 70 in client/client.go

View workflow job for this annotation

GitHub Actions / lint

ST1003: var failedCommandIdSets should be failedCommandIDSets (staticcheck)
for _, reply := range replies {
successfulHighestCmds = append(successfulHighestCmds, reply.GetHighestSequenceNumber())
_, ok := commandCount[reply.GetHighestSequenceNumber()]
if !ok {
commandCount[reply.GetHighestSequenceNumber()] = 1
continue
}
commandCount[reply.GetHighestSequenceNumber()]++
failedCommandIdSets = append(failedCommandIdSets, reply.GetFailedSequenceNumbers())
}
leastOverlapFailedCmds := leastOverlapSet(failedCommandIdSets)
slices.Sort(successfulHighestCmds)
slices.Reverse(successfulHighestCmds)
for _, cmd := range successfulHighestCmds {
if commandCount[cmd] >= q.faulty+1 {
return &clientpb.CommandStatusResponse{
HighestSequenceNumber: cmd,
FailedSequenceNumbers: leastOverlapFailedCmds,
}, true
}
}
return nil, false
}

type pendingCmd struct {

Check failure on line 95 in client/client.go

View workflow job for this annotation

GitHub Actions / lint

type pendingCmd is unused (unused)
sequenceNumber uint64
sendTime time.Time
promise *clientpb.AsyncEmpty
cancelCtx context.CancelFunc
}

Expand All @@ -65,19 +118,19 @@
logger logging.Logger
id ID

mut sync.Mutex
mgr *clientpb.Manager
gorumsConfig *clientpb.Configuration
payloadSize uint32
highestCommitted uint64 // highest sequence number acknowledged by the replicas
pendingCmds chan pendingCmd
cancel context.CancelFunc
done chan struct{}
reader io.ReadCloser
limiter *rate.Limiter
stepUp float64
stepUpInterval time.Duration
timeout time.Duration
mut sync.Mutex
mgr *clientpb.Manager
gorumsConfig *clientpb.Configuration
payloadSize uint32
highestCommitted uint64 // highest sequence number acknowledged by the replicas
cancel context.CancelFunc
done chan struct{}
reader io.ReadCloser
limiter *rate.Limiter
stepUp float64
stepUpInterval time.Duration
timeout time.Duration
failedCommandIdSet *BitSet

Check failure on line 133 in client/client.go

View workflow job for this annotation

GitHub Actions / lint

ST1003: struct field failedCommandIdSet should be failedCommandIDSet (staticcheck)
}

// New returns a new Client.
Expand All @@ -92,17 +145,16 @@
logger: logger,
id: id,

pendingCmds: make(chan pendingCmd, conf.MaxConcurrent),
highestCommitted: 1,
done: make(chan struct{}),
reader: conf.Input,
payloadSize: conf.PayloadSize,
limiter: rate.NewLimiter(rate.Limit(conf.RateLimit), 1),
stepUp: conf.RateStep,
stepUpInterval: conf.RateStepInterval,
timeout: conf.Timeout,
highestCommitted: 1,
done: make(chan struct{}),
reader: conf.Input,
payloadSize: conf.PayloadSize,
limiter: rate.NewLimiter(rate.Limit(conf.RateLimit), 1),
stepUp: conf.RateStep,
stepUpInterval: conf.RateStepInterval,
timeout: conf.Timeout,
failedCommandIdSet: NewBitSet(5000000), // assuming max 5 million commands for now
}

var creds credentials.TransportCredentials
if conf.TLS {
creds = credentials.NewClientTLSFromCert(conf.RootCAs, "")
Expand All @@ -125,6 +177,7 @@
}
c.gorumsConfig, err = c.mgr.NewConfiguration(&qspec{faulty: hotstuff.NumFaulty(len(replicas))}, gorums.WithNodeMap(nodes))
if err != nil {
c.logger.Error("unable to create the configuration in client")
c.mgr.Close()
return err
}
Expand Down Expand Up @@ -182,6 +235,7 @@
}

func (c *Client) close() {
// Signal the command handler to stop fetching statuses before closing the manager.
c.mgr.Close()
err := c.reader.Close()
if err != nil {
Expand All @@ -197,7 +251,6 @@
nextLogTime = time.Now().Add(time.Second)
)

loop:
for ctx.Err() == nil {

// step up the rate limiter
Expand Down Expand Up @@ -236,23 +289,12 @@
SequenceNumber: num,
Data: data[:n],
}

ctx, cancel := context.WithTimeout(ctx, c.timeout)
promise := c.gorumsConfig.ExecCommand(ctx, cmd)
pending := pendingCmd{sequenceNumber: num, sendTime: time.Now(), promise: promise, cancelCtx: cancel}

c.gorumsConfig.ExecCommand(context.Background(), cmd)
num++
select {
case c.pendingCmds <- pending:
case <-ctx.Done():
break loop
}

if time.Now().After(nextLogTime) {
c.logger.Infof("%d commands sent so far", num)
nextLogTime = time.Now().Add(time.Second)
}

}
return nil
}
Expand All @@ -262,42 +304,90 @@
// acknowledged in the order that they were sent.
func (c *Client) handleCommands(ctx context.Context) (executed, failed, timeout int) {
for {
var (
cmd pendingCmd
ok bool
)
statusRefresher := time.NewTicker(100 * time.Millisecond)
select {
case cmd, ok = <-c.pendingCmds:
if !ok {
return
case <-statusRefresher.C:
commandStatus, err := c.gorumsConfig.CommandStatus(ctx, &clientpb.Command{
ClientID: uint32(c.id),
})
if err != nil {
c.logger.Error("Failed to get command status: ", err)
continue
}
c.mut.Lock()
if c.highestCommitted < commandStatus.HighestSequenceNumber {
c.highestCommitted = commandStatus.HighestSequenceNumber
}
for _, failedSeqNum := range commandStatus.FailedSequenceNumbers {
c.failedCommandIdSet.Add(failedSeqNum)
}
failed = c.failedCommandIdSet.Count()
executed = int(c.highestCommitted) - failed
timeout = 0
c.mut.Unlock()
case <-ctx.Done():
return
}
_, err := cmd.promise.Get()
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
c.logger.Debug("Command timed out.")
timeout++
} else if !errors.Is(err, context.Canceled) {
c.logger.Debugf("Did not get enough replies for command: %v\n", err)
failed++
}
} else {
executed++
}
c.mut.Lock()
if cmd.sequenceNumber > c.highestCommitted {
c.highestCommitted = cmd.sequenceNumber
}
c.mut.Unlock()

duration := time.Since(cmd.sendTime)
c.eventLoop.AddEvent(LatencyMeasurementEvent{Latency: duration})
}
}

// LatencyMeasurementEvent represents a single latency measurement.
type LatencyMeasurementEvent struct {
Latency time.Duration
}

// BitSet is a space-efficient set for uint64 values
type BitSet struct {
mut sync.Mutex
bits []uint64
}

func NewBitSet(maxVal uint64) *BitSet {
size := (maxVal / 64) + 1
return &BitSet{
bits: make([]uint64, size),
}
}

func (bs *BitSet) Add(val uint64) {
bs.mut.Lock()
defer bs.mut.Unlock()

index := val / 64
offset := val % 64
if index < uint64(len(bs.bits)) {
bs.bits[index] |= (1 << offset)
}
}

func (bs *BitSet) Contains(val uint64) bool {
bs.mut.Lock()
defer bs.mut.Unlock()

index := val / 64
offset := val % 64
if index < uint64(len(bs.bits)) {
return (bs.bits[index] & (1 << offset)) != 0
}
return false
}

func (bs *BitSet) Count() int {
bs.mut.Lock()
defer bs.mut.Unlock()

count := 0
for _, word := range bs.bits {
count += popcount(word)
}
return count
}

func popcount(x uint64) int {
count := 0
for x != 0 {
x &= x - 1
count++
}
return count
}
Loading
Loading