-
Notifications
You must be signed in to change notification settings - Fork 37
fix(oauth): add thread-safe token refresh with double-checked locking #883
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
fix(oauth): add thread-safe token refresh with double-checked locking #883
Conversation
When multiple streams start concurrently with an expired token, they can all detect the token is expired and attempt to refresh simultaneously. For single-use refresh tokens, this causes failures because the refresh token is invalidated after the first successful refresh. This fix adds a class-level lock to AbstractOauth2Authenticator that serializes token refresh attempts. The double-checked locking pattern ensures that: 1. Only one thread performs the actual refresh 2. Other threads wait and use the newly refreshed token 3. No redundant refresh attempts occur Changes: - Add _token_refresh_lock class-level threading.Lock to AbstractOauth2Authenticator - Update get_access_token() in AbstractOauth2Authenticator to use double-checked locking - Update get_access_token() in SingleUseRefreshTokenOauth2Authenticator with same pattern - Add unit tests for concurrent token refresh scenarios Co-Authored-By: [email protected] <[email protected]>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksTesting This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1769515185-thread-safe-oauth-refresh#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1769515185-thread-safe-oauth-refreshPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
PyTest Results (Full)3 831 tests 3 819 ✅ 10m 22s ⏱️ Results for commit c881893. ♻️ This comment has been updated with latest results. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR fixes a race condition in OAuth token refresh by implementing thread-safe double-checked locking. When multiple streams start concurrently with an expired token, they would all attempt to refresh simultaneously, causing failures with single-use refresh tokens since the token is invalidated after the first successful refresh.
Changes:
- Added a class-level
threading.LocktoAbstractOauth2Authenticatorto synchronize token refresh across all instances - Implemented double-checked locking pattern in both
AbstractOauth2Authenticator.get_access_token()andSingleUseRefreshTokenOauth2Authenticator.get_access_token() - Added comprehensive unit tests verifying that 5 concurrent threads result in exactly 1 refresh call
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| airbyte_cdk/sources/streams/http/requests_native_auth/abstract_oauth.py | Added class-level lock and double-checked locking in base authenticator |
| airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py | Added double-checked locking to SingleUseRefreshTokenOauth2Authenticator |
| unit_tests/sources/streams/http/requests_native_auth/test_requests_native_auth.py | Added concurrent token refresh tests for both authenticator types |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # through the connector config. Without this lock, concurrent refresh attempts can cause race | ||
| # conditions where one stream successfully refreshes the token while others fail because the | ||
| # refresh token has been invalidated (especially for single-use refresh tokens). | ||
| _token_refresh_lock: threading.Lock = threading.Lock() |
Copilot
AI
Jan 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The class-level lock creates a global bottleneck where ALL authenticator instances across different OAuth flows will block each other during token refresh. This means unrelated OAuth configurations (e.g., different API credentials for different connectors running in the same process) will unnecessarily wait for each other. Consider using an instance-level lock instead, or if multiple streams truly share the same credentials, document this assumption clearly and consider grouping authenticators by credential identity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The class-level lock is intentional and necessary for this fix. Here's why:
The problem: Each stream gets its own authenticator instance, but all instances share the same refresh token from the connector config. When Stream A's authenticator refreshes the token, it invalidates the refresh token that Stream B's authenticator is also trying to use.
Why instance-level lock doesn't work: An instance-level lock would only protect concurrent calls to the same instance. Since each stream has its own authenticator instance, an instance-level lock would NOT prevent the race condition between different streams.
Why this isn't a problem in practice: Airbyte runs each connector in its own process/container, so different connectors won't share the same class. The lock only affects authenticator instances within the same connector process, which is exactly what we need.
The comment in the code documents this assumption. If there's ever a use case where truly independent OAuth flows run in the same process and shouldn't block each other, we could consider a more sophisticated approach (e.g., keying locks by credential identity), but that would add complexity for a scenario that doesn't currently exist in the Airbyte architecture.
| nonlocal refresh_call_count | ||
| with refresh_call_lock: | ||
| refresh_call_count += 1 | ||
| time.sleep(0.1) |
Copilot
AI
Jan 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hard-coded sleep duration makes tests brittle and slower than necessary. Consider using a threading event or a smaller sleep duration with explicit synchronization to make the test both faster and more reliable.
| nonlocal refresh_call_count | ||
| with refresh_call_lock: | ||
| refresh_call_count += 1 | ||
| time.sleep(0.1) |
Copilot
AI
Jan 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hard-coded sleep duration makes tests brittle and slower than necessary. Consider using a threading event or a smaller sleep duration with explicit synchronization to make the test both faster and more reliable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The 0.1 second sleep is intentional to simulate a slow token refresh operation. This ensures that:
- Multiple threads have time to detect the expired token and attempt to refresh
- The threads pile up waiting for the lock, which is the scenario we're testing
Without some delay, the first thread might complete the refresh before others even start, making the test non-deterministic.
That said, 0.1 seconds is quite short (the test completes in ~0.3 seconds total), and using threading events would add complexity without significant benefit for this straightforward concurrency test. The current approach is simple, reliable, and fast enough.
|
/prerelease
|
aldogonzalez8
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
APPROVED
Just left a comment about PEP8.
| import threading | ||
| import time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I these imports should be on top.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! I've moved the threading and time imports to the top of the file per PEP8 conventions. Thanks for the review!
| import threading | ||
| import time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar to above
| with self._token_refresh_lock: | ||
| # Double-check after acquiring lock - another thread may have already refreshed | ||
| if self.token_has_expired(): | ||
| token, expires_in = self.refresh_access_token() | ||
| self.access_token = token | ||
| self.set_token_expiry_date(expires_in) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to confirm my understanding of the logic:
- Multiple streams/threads can detect that a refresh token has expired.
- Multiple streams could acquire the lock to try to refresh the access token.
- But now, only one will effectively do it after acquiring the lock, as it will double-check not other stream/thread has already performed the task.
The above makes sense to me. Also the tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, your understanding is exactly correct!
- Multiple streams/threads can detect that the access token has expired (first check without lock)
- Multiple streams could try to acquire the lock to refresh
- But only one will effectively do the refresh - after acquiring the lock, it double-checks if the token is still expired. If another thread already refreshed it, the token will now be valid and no refresh is needed.
This prevents the race condition where multiple threads would all try to use the same (now-invalidated) refresh token.
Address PR review feedback from aldogonzalez8 to follow PEP8 import conventions. Co-Authored-By: [email protected] <[email protected]>
Summary
Fixes a race condition where multiple streams starting concurrently with an expired OAuth token would all attempt to refresh simultaneously. For single-use refresh tokens, this causes failures because the refresh token is invalidated after the first successful refresh.
The fix adds a class-level
threading.LocktoAbstractOauth2Authenticatorand implements double-checked locking inget_access_token():The same pattern is applied to
SingleUseRefreshTokenOauth2Authenticator.get_access_token()which overrides the base method.Review & Testing Checklist for Human
refresh_access_token(). Test with an actual connector that has multiple streams and uses single-use refresh tokens (e.g., a connector where you've observed this issue).refresh_access_token()directly, bypassing the lock protection.Notes
Requested by: [email protected]
Link to Devin run: https://app.devin.ai/sessions/47c89308cb7f48b48d6bb424a393166e