Skip to content

Celery with GCP Pub/Sub Transport Fails When Used With Pub/Sub Emulator #2427

@NarendharManimohan

Description

@NarendharManimohan

Celery with the GCP Pub/Sub transport cannot be tested locally using the Pub/Sub emulator, as the _size() method in the GCP Pub/Sub transport fails, because it attempts to call the Google Cloud Monitoring API, which is not supported by the emulator.

Current Behavior

When running Celery with the gcpubsub transport using the Pub/Sub emulator (local development/testing), the _size() method attempts to query the Google Cloud Monitoring API to get queue message counts. The emulator does not support the Monitoriing API, causing the method to fail or return incorrect values.

# kombu/transport/gcpubsub.py
def _size(self, queue: str) -> int:
    """Return the number of messages in a queue."""
    queue = self.entity_name(queue)
    if queue not in self._queue_cache:
        return 0
    qdesc = self._queue_cache[queue]
    
    # This fails when using the emulator
    result = query.Query(
        self.monitor,
        self.project_id,
        'pubsub.googleapis.com/subscription/num_undelivered_messages',
        end_time=datetime.datetime.now(),
        minutes=1,
    ).select_resources(subscription_id=qdesc.subscription_id)
    # ...

Expected Behavior

When the PUBSUB_EMULATOR_HOST environment variable is set, the _size() method should skip the monitoring API call and either:

  1. Return -1 to indicate the size is unknown (consistent with permission denied behavior), or
  2. Use an alternative method to get the approximate message count if available

Steps to Reproduce

  1. Start the Pub/Sub emulator:

    gcloud beta emulators pubsub start --project=test-project
  2. Set the emulator environment variable:

    export PUBSUB_EMULATOR_HOST=localhost:8085
  3. Run Celery worker with gcpubsub transport:

    app = Celery(__name__)
    app.conf.update(
        broker_url='gcpubsub://projects/test-project',
        task_default_queue='test-queue',
    )
  4. The worker fails incorrectly due to monitoring API errors and expects Google Credentials.

Environment

  • Kombu Version: 5.6.0
  • Python Version: 3.11+
  • GCP Pub/Sub Emulator: Latest
  • Celery Version: 5.5.3+

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions