Skip to content

Refactor separating queue backend from batch management (ESB v3)#46

Open
mmenozzi wants to merge 16 commits intomasterfrom
queue-manger-race-condition-fix-v3
Open

Refactor separating queue backend from batch management (ESB v3)#46
mmenozzi wants to merge 16 commits intomasterfrom
queue-manger-race-condition-fix-v3

Conversation

@mmenozzi
Copy link
Member

@mmenozzi mmenozzi commented Jan 7, 2026

This is a more clean way to fix the same problem of the PR #45 but it will require a major version bump.

@mmenozzi mmenozzi added the 3.x label Jan 7, 2026
@mmenozzi mmenozzi force-pushed the queue-manger-race-condition-fix-v3 branch from 1dfac07 to 1d797a9 Compare January 7, 2026 17:24
@azambon azambon changed the title WIP refactor separating queue backend from batch management Refactor separating queue backend from batch management (ESB v3) Feb 2, 2026
@azambon
Copy link
Member

azambon commented Feb 2, 2026

Tests are red because if Roave BC break checks, but the tests themselves are green.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors ESB v3 queueing by splitting the queue backend (Beanstalk + Elasticsearch operations) from producer-side batch buffering to address the concurrency issue described in PR #45, while also completing the release_delayerror_retry_delay rename and updating the supported PHP/tooling matrix.

Changes:

  • Introduce QueueBackendInterface + BeanstalkElasticsearchQueueBackend and move producer batching into a new BatchManager created per produceAndQueueJobs() call.
  • Rename worker retry delay config/log context from release_delay to error_retry_delay and remove deprecated config support/docs.
  • Update supported/runtime matrix (PHP 8.5, Ubuntu 24.04 CI) and remove deprecated pager exception shims / deprecated pager argument handling.

Reviewed changes

Copilot reviewed 25 out of 25 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
tests/Integration/FailingJobHandlingTest.php Updates config and log assertions to use error_retry_delay.
src/WorkerInstance.php Switches from queue manager to queue backend; updates retry delay log context key.
src/Service/QueueBackendInterface.php Renames/defines the unified backend interface and adds producer-oriented methods.
src/Service/BeanstalkElasticsearchQueueBackend.php Replaces QueueManager with a backend implementation; removes internal batching and adds enqueueJobs().
src/Service/BatchManagerInterface.php Renames the producer batching interface (producer-side).
src/Service/BatchManagerFactory.php Adds a factory to build per-call batch managers.
src/Service/BatchManager.php New batching implementation that buffers jobs and flushes via QueueBackendInterface.
src/ProducerInstance.php Uses per-call BatchManager instead of shared batching state to avoid concurrency issues.
src/Model/FlowConfig.php Removes fallback to deprecated release_delay, returning error_retry_delay only.
src/FlowExtension.php Updates Symfony DI wiring to use the new backend + batch manager factory per flow.
src/FlowConfiguration.php Removes deprecated release_delay config node.
src/Console/Pager/AsyncPager.php Removes deprecated boolean argument handling in setCurrentPage().
src/Console/Pager/*.php (removed) Drops deprecated pager exception shim classes for v3.
composer.json Expands PHP constraint to include 8.5.
README.md Updates requirements and removes deprecated release_delay docs.
Dockerfile Defaults container build ARG to PHP 8.5.
.github/workflows/ci.yaml Moves CI to Ubuntu 24.04 and adds PHP 8.5 to the matrix.
Comments suppressed due to low confidence (3)

src/Service/BeanstalkElasticsearchQueueBackend.php:22

  • DEFAULT_BATCH_ID is no longer used after moving batching out of the queue backend. Please remove this constant to avoid dead code.
    src/Service/QueueBackendInterface.php:72
  • enqueueJobs() is documented as JobInterface[], but BeanstalkElasticsearchQueueBackend::enqueueJobs() relies on unsetting by UUID key. Either tighten this contract to array<string, JobInterface> (UUID as key) or change implementations to not depend on array keys (e.g., filter by UUID set).
    src/Service/BeanstalkElasticsearchQueueBackend.php:247
  • This code unsets $jobs[$uuid] based on the bulk response _id, which only works if $jobs is keyed by UUID. If callers pass a numerically-indexed list (JobInterface[]), failed jobs won’t be removed and will still be put into Beanstalk. Consider filtering by UUID without relying on array keys, or enforce/validate array<string, JobInterface>.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

azambon and others added 3 commits February 9, 2026 12:01
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 25 out of 25 changed files in this pull request and generated 1 comment.

Comments suppressed due to low confidence (4)

src/Service/QueueBackendInterface.php:72

  • enqueueJobs() is documented as taking JobInterface[], but BeanstalkElasticsearchQueueBackend::enqueueJobs() relies on UUID keys to drop failed items. Update this PHPDoc to the intended shape (e.g. array<string, JobInterface>) or change the backend implementation to not depend on array keys.
    src/Service/BeanstalkElasticsearchQueueBackend.php:22
  • DEFAULT_BATCH_ID is no longer used after moving batching logic into BatchManager, but the constant remains declared. Removing it avoids dead code and confusion about where batching is handled.
    src/Service/BeanstalkElasticsearchQueueBackend.php:247
  • unset($jobs[$uuid]) assumes the $jobs array is keyed by job UUID, but the interface/docs describe it as a plain JobInterface[] list. If $jobs is numerically indexed, failed ES items won't be removed and will still be enqueued to Beanstalk. Either change the contract to array<string, JobInterface> or filter the list without relying on array keys.
    src/Service/BeanstalkElasticsearchQueueBackend.php:228
  • PHPDoc @param array<JobInterface> $jobs is not valid PHPDoc generic syntax for PHPStan/Psalm in this codebase. Use JobInterface[] or (preferably, given the UUID-keyed expectation) array<string, JobInterface> to match the actual shape.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 26 out of 26 changed files in this pull request and generated 2 comments.

Comments suppressed due to low confidence (2)

src/Service/BeanstalkElasticsearchQueueBackend.php:247

  • In the bulk-index error handling, jobs are removed from the $jobs array via unset($jobs[$uuid]), which only works if $jobs is keyed by UUID. If a caller passes a numerically indexed array (as allowed by the JobInterface[] PHPDoc), failed-to-index jobs will still be enqueued to Beanstalk, leading to buried jobs and worker errors when the job is missing in Elasticsearch. Consider normalizing $jobs to a uuid=>Job map at the start of the method, or filter out failed UUIDs without relying on array keys.
    src/Service/BeanstalkElasticsearchQueueBackend.php:21
  • DEFAULT_BATCH_ID is no longer used after moving batching out of this class. Keeping it is misleading and can be removed to avoid confusion.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +65 to 67
if (!($job = yield $this->queueBackend->getNextJob())) {
break;
}
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QueueBackendInterface::getNextJob() (and the current Beanstalk implementation) blocks until a job is available and never returns null, but this code treats a falsy result as a signal to break the worker loop. Either adjust the backend/interface contract to allow null (e.g., for timed polling/shutdown), or remove the null-check to avoid dead/unreachable control flow.

Suggested change
if (!($job = yield $this->queueBackend->getNextJob())) {
break;
}
$job = yield $this->queueBackend->getNextJob();

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants