Skip to content

Conversation

@devin-ai-integration
Copy link
Contributor

Summary

Fixes a race condition where multiple streams starting concurrently with an expired OAuth token would all attempt to refresh simultaneously. For single-use refresh tokens, this causes failures because the refresh token is invalidated after the first successful refresh.

The fix adds a class-level threading.Lock to AbstractOauth2Authenticator and implements double-checked locking in get_access_token():

  1. First check if token is expired (without lock - performance optimization)
  2. Acquire lock
  3. Re-check if token is still expired (another thread may have refreshed)
  4. If still expired, perform the refresh
  5. Release lock

The same pattern is applied to SingleUseRefreshTokenOauth2Authenticator.get_access_token() which overrides the base method.

Review & Testing Checklist for Human

  • Verify class-level lock scope is correct: The lock is shared across ALL authenticator instances. This is intentional because multiple streams share the same refresh token from the connector config. However, verify this won't cause issues if there are truly independent OAuth flows in the same process.
  • Test with a real connector: The unit tests mock refresh_access_token(). Test with an actual connector that has multiple streams and uses single-use refresh tokens (e.g., a connector where you've observed this issue).
  • Check for other code paths: Verify there are no other code paths that call refresh_access_token() directly, bypassing the lock protection.

Notes

  • The double-checked locking pattern is standard for this use case in Python
  • Unit tests verify that 5 concurrent threads result in exactly 1 refresh call
  • Lint and type checks pass locally

Requested by: [email protected]

Link to Devin run: https://app.devin.ai/sessions/47c89308cb7f48b48d6bb424a393166e

When multiple streams start concurrently with an expired token, they can
all detect the token is expired and attempt to refresh simultaneously.
For single-use refresh tokens, this causes failures because the refresh
token is invalidated after the first successful refresh.

This fix adds a class-level lock to AbstractOauth2Authenticator that
serializes token refresh attempts. The double-checked locking pattern
ensures that:
1. Only one thread performs the actual refresh
2. Other threads wait and use the newly refreshed token
3. No redundant refresh attempts occur

Changes:
- Add _token_refresh_lock class-level threading.Lock to AbstractOauth2Authenticator
- Update get_access_token() in AbstractOauth2Authenticator to use double-checked locking
- Update get_access_token() in SingleUseRefreshTokenOauth2Authenticator with same pattern
- Add unit tests for concurrent token refresh scenarios

Co-Authored-By: [email protected] <[email protected]>
@devin-ai-integration
Copy link
Contributor Author

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@github-actions
Copy link

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

💡 Show Tips and Tricks

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1769515185-thread-safe-oauth-refresh#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1769515185-thread-safe-oauth-refresh

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /prerelease - Triggers a prerelease publish with default arguments
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment
📚 Show Repo Guidance

Helpful Resources

📝 Edit this welcome message.

@github-actions
Copy link

github-actions bot commented Jan 27, 2026

PyTest Results (Fast)

3 828 tests  +2   3 816 ✅ +2   6m 28s ⏱️ -6s
    1 suites ±0      12 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit c881893. ± Comparison against base commit 9b23860.

♻️ This comment has been updated with latest results.

@github-actions
Copy link

github-actions bot commented Jan 27, 2026

PyTest Results (Full)

3 831 tests   3 819 ✅  10m 22s ⏱️
    1 suites     12 💤
    1 files        0 ❌

Results for commit c881893.

♻️ This comment has been updated with latest results.

@tolik0 tolik0 self-assigned this Jan 27, 2026
@tolik0 tolik0 marked this pull request as ready for review January 27, 2026 13:04
Copilot AI review requested due to automatic review settings January 27, 2026 13:04
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes a race condition in OAuth token refresh by implementing thread-safe double-checked locking. When multiple streams start concurrently with an expired token, they would all attempt to refresh simultaneously, causing failures with single-use refresh tokens since the token is invalidated after the first successful refresh.

Changes:

  • Added a class-level threading.Lock to AbstractOauth2Authenticator to synchronize token refresh across all instances
  • Implemented double-checked locking pattern in both AbstractOauth2Authenticator.get_access_token() and SingleUseRefreshTokenOauth2Authenticator.get_access_token()
  • Added comprehensive unit tests verifying that 5 concurrent threads result in exactly 1 refresh call

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.

File Description
airbyte_cdk/sources/streams/http/requests_native_auth/abstract_oauth.py Added class-level lock and double-checked locking in base authenticator
airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py Added double-checked locking to SingleUseRefreshTokenOauth2Authenticator
unit_tests/sources/streams/http/requests_native_auth/test_requests_native_auth.py Added concurrent token refresh tests for both authenticator types

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

# through the connector config. Without this lock, concurrent refresh attempts can cause race
# conditions where one stream successfully refreshes the token while others fail because the
# refresh token has been invalidated (especially for single-use refresh tokens).
_token_refresh_lock: threading.Lock = threading.Lock()
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The class-level lock creates a global bottleneck where ALL authenticator instances across different OAuth flows will block each other during token refresh. This means unrelated OAuth configurations (e.g., different API credentials for different connectors running in the same process) will unnecessarily wait for each other. Consider using an instance-level lock instead, or if multiple streams truly share the same credentials, document this assumption clearly and consider grouping authenticators by credential identity.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The class-level lock is intentional and necessary for this fix. Here's why:

The problem: Each stream gets its own authenticator instance, but all instances share the same refresh token from the connector config. When Stream A's authenticator refreshes the token, it invalidates the refresh token that Stream B's authenticator is also trying to use.

Why instance-level lock doesn't work: An instance-level lock would only protect concurrent calls to the same instance. Since each stream has its own authenticator instance, an instance-level lock would NOT prevent the race condition between different streams.

Why this isn't a problem in practice: Airbyte runs each connector in its own process/container, so different connectors won't share the same class. The lock only affects authenticator instances within the same connector process, which is exactly what we need.

The comment in the code documents this assumption. If there's ever a use case where truly independent OAuth flows run in the same process and shouldn't block each other, we could consider a more sophisticated approach (e.g., keying locks by credential identity), but that would add complexity for a scenario that doesn't currently exist in the Airbyte architecture.

nonlocal refresh_call_count
with refresh_call_lock:
refresh_call_count += 1
time.sleep(0.1)
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hard-coded sleep duration makes tests brittle and slower than necessary. Consider using a threading event or a smaller sleep duration with explicit synchronization to make the test both faster and more reliable.

Copilot uses AI. Check for mistakes.
nonlocal refresh_call_count
with refresh_call_lock:
refresh_call_count += 1
time.sleep(0.1)
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hard-coded sleep duration makes tests brittle and slower than necessary. Consider using a threading event or a smaller sleep duration with explicit synchronization to make the test both faster and more reliable.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 0.1 second sleep is intentional to simulate a slow token refresh operation. This ensures that:

  1. Multiple threads have time to detect the expired token and attempt to refresh
  2. The threads pile up waiting for the lock, which is the scenario we're testing

Without some delay, the first thread might complete the refresh before others even start, making the test non-deterministic.

That said, 0.1 seconds is quite short (the test completes in ~0.3 seconds total), and using threading events would add complexity without significant benefit for this straightforward concurrency test. The current approach is simple, reliable, and fast enough.

@tolik0
Copy link
Contributor

tolik0 commented Jan 27, 2026

/prerelease

Prerelease Job Info

This job triggers the publish workflow with default arguments to create a prerelease.

Prerelease job started... Check job output.

✅ Prerelease workflow triggered successfully.

View the publish workflow run: https://github.com/airbytehq/airbyte-python-cdk/actions/runs/21398747886

Copy link
Contributor

@aldogonzalez8 aldogonzalez8 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

APPROVED
Just left a comment about PEP8.

Comment on lines 805 to 806
import threading
import time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I these imports should be on top.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! I've moved the threading and time imports to the top of the file per PEP8 conventions. Thanks for the review!

Comment on lines 859 to 860
import threading
import time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar to above

Comment on lines +106 to +111
with self._token_refresh_lock:
# Double-check after acquiring lock - another thread may have already refreshed
if self.token_has_expired():
token, expires_in = self.refresh_access_token()
self.access_token = token
self.set_token_expiry_date(expires_in)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to confirm my understanding of the logic:

  1. Multiple streams/threads can detect that a refresh token has expired.
  2. Multiple streams could acquire the lock to try to refresh the access token.
  3. But now, only one will effectively do it after acquiring the lock, as it will double-check not other stream/thread has already performed the task.

The above makes sense to me. Also the tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, your understanding is exactly correct!

  1. Multiple streams/threads can detect that the access token has expired (first check without lock)
  2. Multiple streams could try to acquire the lock to refresh
  3. But only one will effectively do the refresh - after acquiring the lock, it double-checks if the token is still expired. If another thread already refreshed it, the token will now be valid and no refresh is needed.

This prevents the race condition where multiple threads would all try to use the same (now-invalidated) refresh token.

Address PR review feedback from aldogonzalez8 to follow PEP8 import conventions.

Co-Authored-By: [email protected] <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants