Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion drpcmanager/active_streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func testMuxWriter(t *testing.T) *drpcwire.MuxWriter {
}

func testStream(t *testing.T, id uint64) *drpcstream.Stream {
return drpcstream.New(context.Background(), id, testMuxWriter(t))
return drpcstream.New(context.Background(), id, testMuxWriter(t), drpcwire.NewBufferPool())
}

func TestActiveStreams_AddAndGet(t *testing.T) {
Expand Down
27 changes: 18 additions & 9 deletions drpcmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

"github.com/zeebo/errs"
grpcmetadata "google.golang.org/grpc/metadata"

"storj.io/drpc"
"storj.io/drpc/drpcdebug"
"storj.io/drpc/drpcmetadata"
Expand Down Expand Up @@ -61,7 +60,8 @@ type Manager struct {
wg sync.WaitGroup // tracks active manageStream goroutines

// streams tracks active streams.
streams *activeStreams
streams *activeStreams
recvPool *drpcwire.BufferPool

pdone drpcsignal.Chan // signals when NewServerStream has registered the new stream
invokes chan invokeInfo // completed invoke info from manageReader to NewServerStream
Expand Down Expand Up @@ -130,6 +130,7 @@ func NewWithOptions(tr drpc.Transport, kind ManagerKind, opts Options) *Manager
m.pendingStreams = make(map[uint64]*pendingStream)

m.streams = newActiveStreams()
m.recvPool = drpcwire.NewBufferPool()

// set the internal stream options
drpcopts.SetStreamTransport(&m.opts.Stream.Internal, m.tr)
Expand Down Expand Up @@ -220,7 +221,7 @@ func (m *Manager) manageReader() {
func (m *Manager) handleInvokeFrame(fr drpcwire.Frame) error {
ps, ok := m.pendingStreams[fr.ID.Stream]
if !ok {
ps = &pendingStream{pa: drpcwire.NewPacketAssembler()}
ps = &pendingStream{pa: drpcwire.NewPacketAssembler(m.recvPool)}
m.pendingStreams[fr.ID.Stream] = ps
}
pkt, packetReady, err := ps.pa.AppendFrame(fr)
Expand All @@ -233,7 +234,8 @@ func (m *Manager) handleInvokeFrame(fr drpcwire.Frame) error {

// Metadata arrives before invoke; accumulate it and wait for the invoke.
if pkt.Kind == drpcwire.KindInvokeMetadata {
meta, err := drpcmetadata.Decode(pkt.Data)
meta, err := drpcmetadata.Decode(*pkt.Data)
m.recvPool.Put(pkt.Data)
if err != nil {
return err
}
Expand All @@ -243,11 +245,12 @@ func (m *Manager) handleInvokeFrame(fr drpcwire.Frame) error {

// Invoke packet completes the sequence. Send to NewServerStream.
select {
case m.invokes <- invokeInfo{sid: pkt.ID.Stream, data: pkt.Data, metadata: ps.metadata}:
case m.invokes <- invokeInfo{sid: pkt.ID.Stream, data: *pkt.Data, metadata: ps.metadata}:
// Wait for NewServerStream to finish stream creation before reading the
// next frame. This guarantees curr is set for subsequent non-invoke
// packets.
m.pdone.Recv()
m.recvPool.Put(pkt.Data)
// TODO: reuse pending stream
delete(m.pendingStreams, fr.ID.Stream)
case <-m.sigs.term.Signal():
Expand All @@ -260,15 +263,17 @@ func (m *Manager) handleInvokeFrame(fr drpcwire.Frame) error {
//

// newStream creates a stream value with the appropriate configuration for this manager.
func (m *Manager) newStream(ctx context.Context, sid uint64, kind drpc.StreamKind, rpc string) (*drpcstream.Stream, error) {
func (m *Manager) newStream(
ctx context.Context, sid uint64, kind drpc.StreamKind, rpc string,
) (*drpcstream.Stream, error) {
opts := m.opts.Stream
drpcopts.SetStreamKind(&opts.Internal, kind)
drpcopts.SetStreamRPC(&opts.Internal, rpc)
if cb := drpcopts.GetManagerStatsCB(&m.opts.Internal); cb != nil {
drpcopts.SetStreamStats(&opts.Internal, cb(rpc))
}

stream := drpcstream.NewWithOptions(ctx, sid, m.wr, opts)
stream := drpcstream.NewWithOptions(ctx, sid, m.wr, m.recvPool, opts)

if err := m.streams.Add(sid, stream); err != nil {
return nil, err
Expand Down Expand Up @@ -334,7 +339,9 @@ func (m *Manager) Close() error {
}

// NewClientStream starts a stream on the managed transport for use by a client.
func (m *Manager) NewClientStream(ctx context.Context, rpc string) (stream *drpcstream.Stream, err error) {
func (m *Manager) NewClientStream(
ctx context.Context, rpc string,
) (stream *drpcstream.Stream, err error) {
if err := ctx.Err(); err != nil {
return nil, err
}
Expand All @@ -344,7 +351,9 @@ func (m *Manager) NewClientStream(ctx context.Context, rpc string) (stream *drpc
// NewServerStream starts a stream on the managed transport for use by a server.
// It does this by waiting for the client to issue an invoke message and
// returning the details.
func (m *Manager) NewServerStream(ctx context.Context) (stream *drpcstream.Stream, rpc string, err error) {
func (m *Manager) NewServerStream(
ctx context.Context,
) (stream *drpcstream.Stream, rpc string, err error) {
select {
case <-ctx.Done():
return nil, "", ctx.Err()
Expand Down
78 changes: 35 additions & 43 deletions drpcstream/ring_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@

package drpcstream

import "sync"
import (
"sync"

"storj.io/drpc/drpcwire"
)

// defaultRingBufferCapacity is the number of messages the ring buffer can
// hold before the producer blocks. This decouples the transport reader
Expand All @@ -15,11 +19,13 @@ const defaultRingBufferCapacity = 256

// ringBuffer is a bounded single-producer / single-consumer FIFO queue for
// assembled packet data. It sits between manageReader (producer, calls
// Enqueue) and the application goroutine (consumer, calls Dequeue/Done).
// Enqueue) and the application goroutine (consumer, calls Dequeue).
//
// Slots are pre-allocated and reused: each slot's backing array grows via
// append to fit incoming data, then stays at its high-water mark, avoiding
// per-message allocation in steady state.
// Buffers are obtained from a shared BufferPool. Enqueue copies data into a
// pooled buffer, while EnqueueOwned takes ownership of an already-pooled buffer
// without copying; Dequeue returns ownership of that buffer to the caller and
// advances the tail immediately. The caller is responsible for returning the
// buffer to the pool via BufferPool.Put.
//
// After Close, Dequeue drains any queued messages before returning the close
// error. This ensures graceful shutdown (KindClose/KindCloseSend) delivers
Expand All @@ -28,44 +34,46 @@ type ringBuffer struct {
mu sync.Mutex
cond sync.Cond

buf [][]byte // ring of byte slices
head int // next write position (producer)
tail int // next read position (consumer)
count int // number of occupied slots
pool *drpcwire.BufferPool // shared pool; nil means allocate fresh each time
buf []*[]byte // ring of pooled buffer pointers
head int // next write position (producer)
tail int // next read position (consumer)
count int // number of occupied slots

held bool // true between Dequeue and Done
err error // terminal error, set by Close
err error // terminal error, set by Close
}

func (rb *ringBuffer) init() {
func (rb *ringBuffer) init(pool *drpcwire.BufferPool) {
rb.cond.L = &rb.mu
rb.buf = make([][]byte, defaultRingBufferCapacity)
rb.pool = pool
rb.buf = make([]*[]byte, defaultRingBufferCapacity)
}

// Enqueue copies data into the next write slot. If the buffer is full, it
// blocks until a slot is freed or the buffer is closed. If the buffer is
// closed, Enqueue returns silently without enqueuing.
func (rb *ringBuffer) Enqueue(data []byte) {
// Enqueue copies data into a pooled buffer and places it in the next write
// slot. If the buffer is full, it blocks until a slot is freed or the buffer
// is closed. If the buffer is closed, Enqueue returns silently.
func (rb *ringBuffer) Enqueue(data *[]byte) {
rb.mu.Lock()
defer rb.mu.Unlock()

for rb.count == len(rb.buf) && rb.err == nil {
rb.cond.Wait()
}
if rb.err != nil {
rb.pool.Put(data)
return
}

rb.buf[rb.head] = append(rb.buf[rb.head][:0], data...)
rb.buf[rb.head] = data
rb.head = (rb.head + 1) % len(rb.buf)
rb.count++
rb.cond.Broadcast()
}

// Dequeue returns the data from the next read slot. If the buffer is empty,
// it blocks until data is available or the buffer is closed. The returned
// slice is valid until Done is called.
func (rb *ringBuffer) Dequeue() ([]byte, error) {
// Dequeue returns the next buffered message. The returned *[]byte is owned
// by the caller; the tail is advanced immediately. If the ring buffer has a
// pool, the caller should return the buffer via BufferPool.Put when done.
func (rb *ringBuffer) Dequeue() (*[]byte, error) {
rb.mu.Lock()
defer rb.mu.Unlock()

Expand All @@ -76,37 +84,21 @@ func (rb *ringBuffer) Dequeue() ([]byte, error) {
return nil, rb.err
}

rb.held = true
return rb.buf[rb.tail], nil
}

// Done advances the read pointer, making the slot available for reuse.
// It must be called exactly once after each successful Dequeue.
//
// TODO(shubham): remove this method once a shared buffer pool is introduced.
// With a pool, Dequeue will advance the tail immediately and the caller will
// return the buffer to the pool directly.
func (rb *ringBuffer) Done() {
rb.mu.Lock()
defer rb.mu.Unlock()

b := rb.buf[rb.tail]
rb.buf[rb.tail] = nil
rb.tail = (rb.tail + 1) % len(rb.buf)
rb.count--
rb.held = false
rb.cond.Broadcast()

return b, nil
}

// Close marks the buffer as closed with the given error. All blocked Enqueue
// and Dequeue calls are woken and will return. Close waits for any in-progress
// Dequeue/Done pair to complete before setting the error. Subsequent calls are
// no-ops.
// and Dequeue calls are woken and will return. Subsequent calls are no-ops.
func (rb *ringBuffer) Close(err error) {
rb.mu.Lock()
defer rb.mu.Unlock()

for rb.held {
rb.cond.Wait()
}
if rb.err != nil {
return
}
Expand Down
Loading