Skip to content

Commit e017bd9

Browse files
authored
Ensure ConnectionPool.triggerForceShutdown() works and that it doesn't shutdown until all connections are closed (#607)
1 parent 9217854 commit e017bd9

File tree

8 files changed

+490
-31
lines changed

8 files changed

+490
-31
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@ DerivedData
66
Package.resolved
77
.swiftpm
88
Tests/LinuxMain.swift
9+
.vscode/

Sources/ConnectionPoolModule/ConnectionPool.swift

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -390,12 +390,16 @@ public final class ConnectionPool<
390390
self.closeConnection(connection)
391391
self.cancelTimers(timers)
392392

393-
case .shutdown(let cleanup):
393+
case .initiateShutdown(let cleanup):
394394
for connection in cleanup.connections {
395395
self.closeConnection(connection)
396396
}
397397
self.cancelTimers(cleanup.timersToCancel)
398398

399+
case .cancelEventStreamAndFinalCleanup(let timersToCancel):
400+
self.cancelTimers(timersToCancel)
401+
self.eventContinuation.finish()
402+
399403
case .none:
400404
break
401405
}

Sources/ConnectionPoolModule/PoolStateMachine+ConnectionGroup.swift

Lines changed: 73 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ extension PoolStateMachine {
9898
self.keepAliveReducesAvailableStreams = keepAliveReducesAvailableStreams
9999
}
100100

101+
@usableFromInline
101102
var isEmpty: Bool {
102103
self.connections.isEmpty
103104
}
@@ -397,7 +398,9 @@ extension PoolStateMachine {
397398
return nil
398399
}
399400

400-
let connectionInfo = self.connections[index].release(streams: streams)
401+
guard let connectionInfo = self.connections[index].release(streams: streams) else {
402+
return nil
403+
}
401404
self.stats.availableStreams += streams
402405
self.stats.leasedStreams -= streams
403406
switch connectionInfo {
@@ -514,6 +517,53 @@ extension PoolStateMachine {
514517
)
515518
}
516519

520+
@usableFromInline
521+
enum CloseConnectionAction {
522+
case close(CloseAction)
523+
case cancelTimers(Max2Sequence<TimerCancellationToken>)
524+
case doNothing
525+
}
526+
/// Closes the connection at the given index.
527+
@inlinable
528+
mutating func closeConnection(at index: Int) -> CloseConnectionAction {
529+
guard let closeAction = self.connections[index].close() else {
530+
return .doNothing // no action to take
531+
}
532+
533+
self.stats.runningKeepAlive -= closeAction.runningKeepAlive ? 1 : 0
534+
self.stats.availableStreams -= closeAction.maxStreams - closeAction.usedStreams
535+
536+
switch closeAction.previousConnectionState {
537+
case .idle:
538+
self.stats.idle -= 1
539+
self.stats.closing += 1
540+
541+
case .leased:
542+
self.stats.leased -= 1
543+
self.stats.closing += 1
544+
545+
case .closing:
546+
break
547+
548+
case .backingOff:
549+
self.stats.backingOff -= 1
550+
}
551+
552+
if let connection = closeAction.connection {
553+
return .close(CloseAction(
554+
connection: connection,
555+
timersToCancel: closeAction.cancelTimers
556+
))
557+
} else {
558+
// if there is no connection we should delete this now
559+
var timersToCancel = closeAction.cancelTimers
560+
if let cancellationTimer = self.swapForDeletion(index: index) {
561+
timersToCancel.append(cancellationTimer)
562+
}
563+
return .cancelTimers(timersToCancel)
564+
}
565+
}
566+
517567
@inlinable
518568
mutating func closeConnectionIfIdle(_ connectionID: Connection.ID) -> CloseAction? {
519569
guard let index = self.connections.firstIndex(where: { $0.id == connectionID }) else {
@@ -532,6 +582,16 @@ extension PoolStateMachine {
532582
return self.closeConnectionIfIdle(at: index)
533583
}
534584

585+
@inlinable
586+
mutating func destroyFailedConnection(_ connectionID: Connection.ID) -> TimerCancellationToken? {
587+
guard let index = self.connections.firstIndex(where: { $0.id == connectionID }) else {
588+
preconditionFailure("Failing a connection we don't have a record of.")
589+
}
590+
591+
self.connections[index].destroyFailedConnection()
592+
return self.swapForDeletion(index: index)
593+
}
594+
535595
/// Information around the failed/closed connection.
536596
@usableFromInline
537597
struct ClosedAction {
@@ -567,7 +627,7 @@ extension PoolStateMachine {
567627
/// supplied index after this. If nil is returned the connection was closed by the state machine and was
568628
/// therefore already removed.
569629
@inlinable
570-
mutating func connectionClosed(_ connectionID: Connection.ID) -> ClosedAction {
630+
mutating func connectionClosed(_ connectionID: Connection.ID, shuttingDown: Bool) -> ClosedAction {
571631
guard let index = self.connections.firstIndex(where: { $0.id == connectionID }) else {
572632
preconditionFailure("All connections that have been created should say goodbye exactly once!")
573633
}
@@ -597,7 +657,7 @@ extension PoolStateMachine {
597657
}
598658

599659
let newConnectionRequest: ConnectionRequest?
600-
if self.connections.count < self.minimumConcurrentConnections {
660+
if !shuttingDown, self.connections.count < self.minimumConcurrentConnections {
601661
newConnectionRequest = self.createNewConnection()
602662
} else {
603663
newConnectionRequest = .none
@@ -613,18 +673,19 @@ extension PoolStateMachine {
613673
// MARK: Shutdown
614674

615675
mutating func triggerForceShutdown(_ cleanup: inout ConnectionAction.Shutdown) {
616-
for var connectionState in self.connections {
617-
guard let closeAction = connectionState.close() else {
618-
continue
619-
}
676+
for index in self.connections.indices {
677+
switch closeConnection(at: index) {
678+
case .close(let closeAction):
679+
cleanup.connections.append(closeAction.connection)
680+
cleanup.timersToCancel.append(contentsOf: closeAction.timersToCancel)
681+
682+
case .cancelTimers(let timers):
683+
cleanup.timersToCancel.append(contentsOf: timers)
620684

621-
if let connection = closeAction.connection {
622-
cleanup.connections.append(connection)
685+
case .doNothing:
686+
break
623687
}
624-
cleanup.timersToCancel.append(contentsOf: closeAction.cancelTimers)
625688
}
626-
627-
self.connections = []
628689
}
629690

630691
// MARK: - Private functions -

Sources/ConnectionPoolModule/PoolStateMachine+ConnectionState.swift

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ extension PoolStateMachine {
396396
}
397397

398398
@inlinable
399-
mutating func release(streams returnedStreams: UInt16) -> ConnectionAvailableInfo {
399+
mutating func release(streams returnedStreams: UInt16) -> ConnectionAvailableInfo? {
400400
switch self.state {
401401
case .leased(let connection, let usedStreams, let maxStreams, let keepAlive):
402402
precondition(usedStreams >= returnedStreams)
@@ -409,7 +409,11 @@ extension PoolStateMachine {
409409
self.state = .leased(connection, usedStreams: newUsedStreams, maxStreams: maxStreams, keepAlive: keepAlive)
410410
return .leased(availableStreams: availableStreams)
411411
}
412-
case .backingOff, .starting, .idle, .closing, .closed:
412+
413+
case .closing:
414+
return nil
415+
416+
case .backingOff, .starting, .idle, .closed:
413417
preconditionFailure("Invalid state: \(self.state)")
414418
}
415419
}
@@ -587,10 +591,21 @@ extension PoolStateMachine {
587591
runningKeepAlive: keepAlive.isRunning
588592
)
589593

590-
case .leased, .closed:
594+
case .leased, .closed, .closing:
591595
return nil
592596

593-
case .backingOff, .starting, .closing:
597+
case .backingOff, .starting:
598+
preconditionFailure("Invalid state: \(self.state)")
599+
}
600+
}
601+
602+
@inlinable
603+
mutating func destroyFailedConnection() {
604+
switch self.state {
605+
case .starting:
606+
self.state = .closed
607+
608+
case .idle, .leased, .closed, .closing, .backingOff:
594609
preconditionFailure("Invalid state: \(self.state)")
595610
}
596611
}

Sources/ConnectionPoolModule/PoolStateMachine.swift

Lines changed: 77 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,10 @@ struct PoolStateMachine<
8888
case runKeepAlive(Connection, TimerCancellationToken?)
8989
case cancelTimers(TinyFastSequence<TimerCancellationToken>)
9090
case closeConnection(Connection, Max2Sequence<TimerCancellationToken>)
91-
case shutdown(Shutdown)
92-
91+
/// Start process of shutting down the connection pool. Close connections, cancel timers.
92+
case initiateShutdown(Shutdown)
93+
/// All connections have been closed, the pool event stream can be ended.
94+
case cancelEventStreamAndFinalCleanup([TimerCancellationToken])
9395
case none
9496
}
9597

@@ -256,11 +258,16 @@ struct PoolStateMachine<
256258
@inlinable
257259
mutating func connectionEstablished(_ connection: Connection, maxStreams: UInt16) -> Action {
258260
switch self.poolState {
259-
case .running, .shuttingDown(graceful: true):
261+
case .running:
260262
let (index, context) = self.connections.newConnectionEstablished(connection, maxStreams: maxStreams)
261263
return self.handleAvailableConnection(index: index, availableContext: context)
262-
case .shuttingDown(graceful: false), .shutDown:
263-
return .init(request: .none, connection: .closeConnection(connection, []))
264+
265+
case .shuttingDown:
266+
let (index, context) = self.connections.newConnectionEstablished(connection, maxStreams: maxStreams)
267+
return self.handleAvailableConnection(index: index, availableContext: context)
268+
269+
case .shutDown:
270+
fatalError("Connection pool is not running")
264271
}
265272
}
266273

@@ -326,7 +333,18 @@ struct PoolStateMachine<
326333
return .init(request: .none, connection: .scheduleTimers(.init(timer)))
327334

328335
case .shuttingDown(graceful: false), .shutDown:
329-
return .none()
336+
let timerToCancel = self.connections.destroyFailedConnection(request.connectionID)
337+
let connectionAction: ConnectionAction
338+
if self.connections.isEmpty {
339+
self.poolState = .shutDown
340+
connectionAction = .cancelEventStreamAndFinalCleanup(timerToCancel.map {[$0]} ?? [])
341+
} else {
342+
connectionAction = .cancelTimers(timerToCancel.map {[$0]} ?? [])
343+
}
344+
return .init(
345+
request: .none,
346+
connection: connectionAction
347+
)
330348
}
331349
}
332350

@@ -348,7 +366,17 @@ struct PoolStateMachine<
348366
return .init(request: .none, connection: .makeConnection(request, timers))
349367

350368
case .cancelTimers(let timers):
351-
return .init(request: .none, connection: .cancelTimers(.init(timers)))
369+
let connectionAction: ConnectionAction
370+
if self.connections.isEmpty {
371+
self.poolState = .shutDown
372+
connectionAction = .cancelEventStreamAndFinalCleanup(.init(timers))
373+
} else {
374+
connectionAction = .cancelTimers(.init(timers))
375+
}
376+
return .init(
377+
request: .none,
378+
connection: connectionAction
379+
)
352380
}
353381

354382
case .shuttingDown(graceful: false), .shutDown:
@@ -403,7 +431,7 @@ struct PoolStateMachine<
403431
case .running, .shuttingDown(graceful: true):
404432
self.cacheNoMoreConnectionsAllowed = false
405433

406-
let closedConnectionAction = self.connections.connectionClosed(connection.id)
434+
let closedConnectionAction = self.connections.connectionClosed(connection.id, shuttingDown: false)
407435

408436
let connectionAction: ConnectionAction
409437
if let newRequest = closedConnectionAction.newConnectionRequest {
@@ -414,7 +442,19 @@ struct PoolStateMachine<
414442

415443
return .init(request: .none, connection: connectionAction)
416444

417-
case .shuttingDown(graceful: false), .shutDown:
445+
case .shuttingDown(graceful: false):
446+
let closedConnectionAction = self.connections.connectionClosed(connection.id, shuttingDown: true)
447+
448+
let connectionAction: ConnectionAction
449+
if self.connections.isEmpty {
450+
self.poolState = .shutDown
451+
connectionAction = .cancelEventStreamAndFinalCleanup(.init(closedConnectionAction.timersToCancel))
452+
} else {
453+
connectionAction = .cancelTimers(closedConnectionAction.timersToCancel)
454+
}
455+
456+
return .init(request: .none, connection: connectionAction)
457+
case .shutDown:
418458
return .none()
419459
}
420460
}
@@ -442,13 +482,17 @@ struct PoolStateMachine<
442482
var shutdown = ConnectionAction.Shutdown()
443483
self.connections.triggerForceShutdown(&shutdown)
444484

445-
if shutdown.connections.isEmpty {
485+
if self.connections.isEmpty, shutdown.connections.isEmpty {
446486
self.poolState = .shutDown
487+
return .init(
488+
request: .failRequests(self.requestQueue.removeAll(), ConnectionPoolError.poolShutdown),
489+
connection: .cancelEventStreamAndFinalCleanup(shutdown.timersToCancel)
490+
)
447491
}
448492

449493
return .init(
450494
request: .failRequests(self.requestQueue.removeAll(), ConnectionPoolError.poolShutdown),
451-
connection: .shutdown(shutdown)
495+
connection: .initiateShutdown(shutdown)
452496
)
453497

454498
case .shuttingDown:
@@ -481,6 +525,22 @@ struct PoolStateMachine<
481525
return .none()
482526

483527
case .idle(_, let newIdle):
528+
if case .shuttingDown = self.poolState {
529+
switch self.connections.closeConnection(at: index) {
530+
case .close(let closeAction):
531+
return .init(
532+
request: .none,
533+
connection: .closeConnection(closeAction.connection, closeAction.timersToCancel)
534+
)
535+
case .cancelTimers(let timers):
536+
return .init(
537+
request: .none,
538+
connection: .cancelTimers(.init(timers))
539+
)
540+
case .doNothing:
541+
return .none()
542+
}
543+
}
484544
let timers = self.connections.parkConnection(at: index, hasBecomeIdle: newIdle).map(self.mapTimers)
485545

486546
return .init(
@@ -569,6 +629,9 @@ extension PoolStateMachine {
569629
@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
570630
extension PoolStateMachine.Action: Equatable where TimerCancellationToken: Equatable, Request: Equatable {}
571631

632+
@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
633+
extension PoolStateMachine.PoolState: Equatable {}
634+
572635
@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
573636
extension PoolStateMachine.ConnectionAction: Equatable where TimerCancellationToken: Equatable {
574637
@usableFromInline
@@ -582,7 +645,9 @@ extension PoolStateMachine.ConnectionAction: Equatable where TimerCancellationTo
582645
return lhsConn === rhsConn && lhsToken == rhsToken
583646
case (.closeConnection(let lhsConn, let lhsTimers), .closeConnection(let rhsConn, let rhsTimers)):
584647
return lhsConn === rhsConn && lhsTimers == rhsTimers
585-
case (.shutdown(let lhs), .shutdown(let rhs)):
648+
case (.initiateShutdown(let lhs), .initiateShutdown(let rhs)):
649+
return lhs == rhs
650+
case (.cancelEventStreamAndFinalCleanup(let lhs), .cancelEventStreamAndFinalCleanup(let rhs)):
586651
return lhs == rhs
587652
case (.cancelTimers(let lhs), .cancelTimers(let rhs)):
588653
return lhs == rhs

0 commit comments

Comments
 (0)